You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/16 22:19:37 UTC
[1/2] beam git commit: Add custom tempLocation support to BigQueryIO.
Repository: beam
Updated Branches:
refs/heads/master ec052bb44 -> a5cbd764b
Add custom tempLocation support to BigQueryIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ec58a80c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ec58a80c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ec58a80c
Branch: refs/heads/master
Commit: ec58a80ca0f913c85d5f17cba3535243cd010876
Parents: ec052bb
Author: Yunqing Zhou <zh...@zhouyunqing-macbookpro.roam.corp.google.com>
Authored: Fri Oct 13 15:52:18 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Oct 16 15:17:59 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 25 ++++++++--
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 49 ++++++++++++++++----
2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ec58a80c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 6d832e4..1ccd5d6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
@@ -127,11 +128,13 @@ class BatchLoads<DestinationT>
private long maxFileSize;
private int numFileShards;
private Duration triggeringFrequency;
+ private ValueProvider<String> customGcsTempLocation;
BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
boolean singletonTable,
DynamicDestinations<?, DestinationT> dynamicDestinations,
- Coder<DestinationT> destinationCoder) {
+ Coder<DestinationT> destinationCoder,
+ ValueProvider<String> customGcsTempLocation) {
bigQueryServices = new BigQueryServicesImpl();
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
@@ -142,6 +145,7 @@ class BatchLoads<DestinationT>
this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
this.triggeringFrequency = null;
+ this.customGcsTempLocation = customGcsTempLocation;
}
void setTestServices(BigQueryServices bigQueryServices) {
@@ -174,7 +178,16 @@ class BatchLoads<DestinationT>
@Override
public void validate(PipelineOptions options) {
// We will use a BigQuery load job -- validate the temp location.
- String tempLocation = options.getTempLocation();
+ String tempLocation;
+ if (customGcsTempLocation == null) {
+ tempLocation = options.getTempLocation();
+ } else {
+ if (!customGcsTempLocation.isAccessible()) {
+ // Can't perform verification in this case.
+ return;
+ }
+ tempLocation = customGcsTempLocation.get();
+ }
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
@@ -359,9 +372,15 @@ class BatchLoads<DestinationT>
new DoFn<String, String>() {
@ProcessElement
public void getTempFilePrefix(ProcessContext c) {
+ String tempLocationRoot;
+ if (customGcsTempLocation != null) {
+ tempLocationRoot = customGcsTempLocation.get();
+ } else {
+ tempLocationRoot = c.getPipelineOptions().getTempLocation();
+ }
String tempLocation =
resolveTempLocation(
- c.getPipelineOptions().getTempLocation(),
+ tempLocationRoot,
"BigQueryWriteTemp",
c.element());
LOG.info(
http://git-wip-us.apache.org/repos/asf/beam/blob/ec58a80c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2f99643..3dfd8b8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -240,15 +240,6 @@ import org.slf4j.LoggerFactory;
* <p>For the most general form of dynamic table destinations and schemas, look at {@link
* BigQueryIO.Write#to(DynamicDestinations)}.
*
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
- * details.
- *
- * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
- * </a> for security and permission related information specific to BigQuery.
- *
* <h3>Insertion Method</h3>
*
* {@link BigQueryIO.Write} supports two methods of inserting data into BigQuery specified using
@@ -257,6 +248,30 @@ import org.slf4j.LoggerFactory;
* about the methods. The different insertion methods provide different tradeoffs of cost, quota,
* and data consistency; please see BigQuery documentation for more information about these
* tradeoffs.
+ *
+ * <h3>Usage with templates</h3>
+ *
+ * <p>When using {@link #read} or {@link #readTableRows()} in a template, it's required to specify
+ * {@link Read#withTemplateCompatibility()}. Specifying this in a non-template pipeline is not
+ * recommended because it has somewhat lower performance.
+ *
+ * <p>When using {@link #write()} or {@link #writeTableRows()} with batch loads in a template, it is
+ * recommended to specify {@link Write#withCustomGcsTempLocation}. Writing to BigQuery via batch
+ * loads involves writing temporary files to this location, so the location must be accessible at
+ * pipeline execution time. By default, this location is captured at pipeline <i>construction</i>
+ * time, may be inaccessible if the template may be reused from a different project or at a moment
+ * when the original location no longer exists.
+ * {@link Write#withCustomGcsTempLocation(ValueProvider)} allows specifying the location as an
+ * argument to the template invocation.
+ *
+ * <h3>Permissions</h3>
+ *
+ * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
+ * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
+ * </a> for security and permission related information specific to BigQuery.
*/
public class BigQueryIO {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
@@ -1031,6 +1046,8 @@ public class BigQueryIO {
@Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy();
+ @Nullable abstract ValueProvider<String> getCustomGcsTempLocation();
+
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -1059,6 +1076,8 @@ public class BigQueryIO {
abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy);
+ abstract Builder<T> setCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation);
+
abstract Write<T> build();
}
@@ -1303,6 +1322,15 @@ public class BigQueryIO {
return toBuilder().setNumFileShards(numFileShards).build();
}
+ /**
+ * Provides a custom location on GCS for storing temporary files to be loaded via BigQuery
+ * batch load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for
+ * discussion.
+ */
+ public Write<T> withCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation) {
+ return toBuilder().setCustomGcsTempLocation(customGcsTempLocation).build();
+ }
+
@VisibleForTesting
Write<T> withTestServices(BigQueryServices testServices) {
return toBuilder().setBigQueryServices(testServices).build();
@@ -1479,7 +1507,8 @@ public class BigQueryIO {
getCreateDisposition(),
getJsonTableRef() != null,
dynamicDestinations,
- destinationCoder);
+ destinationCoder,
+ getCustomGcsTempLocation());
batchLoads.setTestServices(getBigQueryServices());
if (getMaxFilesPerBundle() != null) {
batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
[2/2] beam git commit: This closes #3995: Add custom gcs temp
location support to BigQueryIO
Posted by jk...@apache.org.
This closes #3995: Add custom gcs temp location support to BigQueryIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5cbd764
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5cbd764
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5cbd764
Branch: refs/heads/master
Commit: a5cbd764bb81ca2aefe225d62e15e222a2266f0b
Parents: ec052bb ec58a80
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Oct 16 15:18:31 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Oct 16 15:18:31 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 25 ++++++++--
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 49 ++++++++++++++++----
2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------