You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/05 18:09:30 UTC
[1/2] beam git commit: Make batch loads repeatable across different
invocations of the same template job.
Repository: beam
Updated Branches:
refs/heads/master 6543e56d2 -> ad2c1f1fc
Make batch loads repeatable across different invocations of the same template job.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94d8547d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94d8547d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94d8547d
Branch: refs/heads/master
Commit: 94d8547d604dc7291cb8a72a12453a3dfe6a5f9a
Parents: 6543e56
Author: Reuven Lax <re...@google.com>
Authored: Sun Apr 23 12:53:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Jun 5 10:54:53 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 50 +++++++++++---------
.../io/gcp/bigquery/WriteBundlesToFiles.java | 12 ++---
2 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/94d8547d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index c1b202e..4393a63 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -60,10 +60,14 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
class BatchLoads<DestinationT>
extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
+ static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class);
+
// The maximum number of file writers to keep open in a single bundle at a time, since file
// writers default to 64mb buffers. This comes into play when writing dynamic table destinations.
// The first 20 tables from a single BatchLoads transform will write files inline in the
@@ -161,28 +165,10 @@ class BatchLoads<DestinationT>
@Override
public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
Pipeline p = input.getPipeline();
- final String stepUuid = BigQueryHelpers.randomUUIDString();
-
- PCollectionView<String> tempFilePrefix =
- p.apply("Create", Create.of((Void) null))
- .apply(
- "GetTempFilePrefix",
- ParDo.of(
- new DoFn<Void, String>() {
- @ProcessElement
- public void getTempFilePrefix(ProcessContext c) {
- c.output(
- resolveTempLocation(
- c.getPipelineOptions().getTempLocation(),
- "BigQueryWriteTemp",
- stepUuid));
- }
- }))
- .apply("TempFilePrefixView", View.<String>asSingleton());
// Create a singleton job ID token at execution time. This will be used as the base for all
// load jobs issued from this instance of the transform.
- PCollectionView<String> jobIdTokenView =
+ final PCollection<String> jobIdToken =
p.apply("TriggerIdCreation", Create.of("ignored"))
.apply(
"CreateJobId",
@@ -190,10 +176,27 @@ class BatchLoads<DestinationT>
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
- return stepUuid;
+ return BigQueryHelpers.randomUUIDString();
+ }
+ }));
+ final PCollectionView<String> jobIdTokenView = jobIdToken.apply(View.<String>asSingleton());
+
+ PCollectionView<String> tempFilePrefix = jobIdToken
+ .apply(
+ "GetTempFilePrefix",
+ ParDo.of(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void getTempFilePrefix(ProcessContext c) {
+ String tempLocation = resolveTempLocation(
+ c.getPipelineOptions().getTempLocation(),
+ "BigQueryWriteTemp", c.element());
+ LOG.info("Writing BigQuery temporary files to {} before loading them.",
+ tempLocation);
+ c.output(tempLocation);
}
}))
- .apply(View.<String>asSingleton());
+ .apply("TempFilePrefixView", View.<String>asSingleton());
PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
input.apply(
@@ -210,9 +213,10 @@ class BatchLoads<DestinationT>
new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {};
PCollectionTuple writeBundlesTuple = inputInGlobalWindow
.apply("WriteBundlesToFiles",
- ParDo.of(new WriteBundlesToFiles<>(stepUuid, unwrittedRecordsTag,
+ ParDo.of(new WriteBundlesToFiles<>(tempFilePrefix, unwrittedRecordsTag,
maxNumWritersPerBundle, maxFileSize))
- .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
+ .withSideInputs(tempFilePrefix)
+ .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
writeBundlesTuple.get(writtenFilesTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
http://git-wip-us.apache.org/repos/asf/beam/blob/94d8547d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 0b5f54b..d68779a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -19,8 +19,6 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
-
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -41,6 +39,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +63,7 @@ class WriteBundlesToFiles<DestinationT>
// Map from tablespec to a writer for that table.
private transient Map<DestinationT, TableRowWriter> writers;
private transient Map<DestinationT, BoundedWindow> writerWindows;
- private final String stepUuid;
+ private final PCollectionView<String> tempFilePrefixView;
private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag;
private int maxNumWritersPerBundle;
private long maxFileSize;
@@ -131,11 +130,11 @@ class WriteBundlesToFiles<DestinationT>
}
WriteBundlesToFiles(
- String stepUuid,
+ PCollectionView<String> tempFilePrefixView,
TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag,
int maxNumWritersPerBundle,
long maxFileSize) {
- this.stepUuid = stepUuid;
+ this.tempFilePrefixView = tempFilePrefixView;
this.unwrittedRecordsTag = unwrittedRecordsTag;
this.maxNumWritersPerBundle = maxNumWritersPerBundle;
this.maxFileSize = maxFileSize;
@@ -159,8 +158,7 @@ class WriteBundlesToFiles<DestinationT>
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- String tempFilePrefix = resolveTempLocation(
- c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
+ String tempFilePrefix = c.sideInput(tempFilePrefixView);
DestinationT destination = c.element().getKey();
TableRowWriter writer;
[2/2] beam git commit: This closes #2657: [BEAM-2058] Generate
BigQuery load job at runtime
Posted by jk...@apache.org.
This closes #2657: [BEAM-2058] Generate BigQuery load job at runtime
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad2c1f1f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad2c1f1f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad2c1f1f
Branch: refs/heads/master
Commit: ad2c1f1fc0fa3b65b8d5befae12eaf983f69cb2c
Parents: 6543e56 94d8547
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jun 5 10:56:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Jun 5 10:56:01 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 50 +++++++++++---------
.../io/gcp/bigquery/WriteBundlesToFiles.java | 12 ++---
2 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------