You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/14 21:32:53 UTC
[1/2] beam git commit: Allow users to choose the BigQuery insertion
method.
Repository: beam
Updated Branches:
refs/heads/master 0f8e8dd70 -> f7e8f886c
Allow users to choose the BigQuery insertion method.
If choosing file load jobs on an unbounded PCollection,
a triggering frequency must be specified to control how
often load jobs are generated.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/075d4d45
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/075d4d45
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/075d4d45
Branch: refs/heads/master
Commit: 075d4d45a9cd398f3b4023b6efd495cc58eb9bdd
Parents: 0f8e8dd
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Sun Jul 30 11:17:39 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Aug 14 14:32:10 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 447 +++++++++++++------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 168 ++++++-
.../sdk/io/gcp/bigquery/ReifyAsIterable.java | 51 +++
.../io/gcp/bigquery/WriteBundlesToFiles.java | 15 +-
.../sdk/io/gcp/bigquery/WritePartition.java | 13 +-
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 111 +++--
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 38 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 269 ++++++-----
8 files changed, 770 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index e46b1d3..0a1306d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -19,6 +19,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import com.google.api.services.bigquery.model.TableRow;
@@ -26,9 +27,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
@@ -48,9 +50,15 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
@@ -62,6 +70,7 @@ import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,6 +102,12 @@ class BatchLoads<DestinationT>
// The maximum size of a single file - 4TiB, just under the 5 TiB limit.
static final long DEFAULT_MAX_FILE_SIZE = 4 * (1L << 40);
+ static final int DEFAULT_NUM_FILE_SHARDS = 0;
+
+ // If user triggering is supplied, we will trigger the file write after this many records are
+ // written.
+ static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+
// The maximum number of retries to poll the status of a job.
// It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
@@ -110,6 +125,8 @@ class BatchLoads<DestinationT>
private final Coder<DestinationT> destinationCoder;
private int maxNumWritersPerBundle;
private long maxFileSize;
+ private int numFileShards;
+ private Duration triggeringFrequency;
BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
boolean singletonTable,
@@ -123,6 +140,8 @@ class BatchLoads<DestinationT>
this.destinationCoder = destinationCoder;
this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
+ this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
+ this.triggeringFrequency = null;
}
void setTestServices(BigQueryServices bigQueryServices) {
@@ -139,6 +158,14 @@ class BatchLoads<DestinationT>
this.maxNumWritersPerBundle = maxNumWritersPerBundle;
}
+ public void setTriggeringFrequency(Duration triggeringFrequency) {
+ this.triggeringFrequency = triggeringFrequency;
+ }
+
+ public void setNumFileShards(int numFileShards) {
+ this.numFileShards = numFileShards;
+ }
+
@VisibleForTesting
void setMaxFileSize(long maxFileSize) {
this.maxFileSize = maxFileSize;
@@ -164,171 +191,323 @@ class BatchLoads<DestinationT>
}
}
- @Override
- public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
+ // Expand the pipeline when the user has requested periodically-triggered file writes.
+ private WriteResult expandTriggered(PCollection<KV<DestinationT, TableRow>> input) {
+ checkArgument(numFileShards > 0);
Pipeline p = input.getPipeline();
+ final PCollectionView<String> jobIdTokenView = createJobIdView(p);
+ final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+ // The user-supplied triggeringDuration is often chosen to to control how many BigQuery load
+ // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
+ // is set to a large value, currently we have to buffer all the data unti the trigger fires.
+ // Instead we ensure that the files are written if a threshold number of records are ready.
+ // We use only the user-supplied trigger on the actual BigQuery load. This allows us to
+ // offload the data to the filesystem.
+ PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
+ input.apply(
+ "rewindowIntoGlobal",
+ Window.<KV<DestinationT, TableRow>>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterFirst.of(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(triggeringFrequency),
+ AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
+ .discardingFiredPanes());
+ PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
+ writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
+
+ // Apply the user's trigger before we start generating BigQuery load jobs.
+ results =
+ results.apply(
+ "applyUserTrigger",
+ Window.<WriteBundlesToFiles.Result<DestinationT>>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(triggeringFrequency)))
+ .discardingFiredPanes());
- // Create a singleton job ID token at execution time. This will be used as the base for all
- // load jobs issued from this instance of the transform.
- final PCollection<String> jobIdToken =
- p.apply("TriggerIdCreation", Create.of("ignored"))
- .apply(
- "CreateJobId",
- MapElements.via(
- new SimpleFunction<String, String>() {
- @Override
- public String apply(String input) {
- return BigQueryHelpers.randomUUIDString();
- }
- }));
- final PCollectionView<String> jobIdTokenView = jobIdToken.apply(View.<String>asSingleton());
+ TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag");
+ TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
+ new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag");
- PCollectionView<String> tempFilePrefix = jobIdToken
+ // If we have non-default triggered output, we can't use the side-input technique used in
+ // expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for
+ // determinism.
+ PCollectionTuple partitions =
+ results
+ .apply(
+ "AttachSingletonKey",
+ WithKeys.<Void, WriteBundlesToFiles.Result<DestinationT>>of((Void) null))
+ .setCoder(
+ KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
+ .apply("GroupOntoSingleton", GroupByKey.<Void, Result<DestinationT>>create())
+ .apply("ExtractResultValues", Values.<Iterable<Result<DestinationT>>>create())
.apply(
- "GetTempFilePrefix",
+ "WritePartitionTriggered",
ParDo.of(
- new DoFn<String, String>() {
- @ProcessElement
- public void getTempFilePrefix(ProcessContext c) {
- String tempLocation = resolveTempLocation(
- c.getPipelineOptions().getTempLocation(),
- "BigQueryWriteTemp", c.element());
- LOG.info("Writing BigQuery temporary files to {} before loading them.",
- tempLocation);
- c.output(tempLocation);
- }
- }))
- .apply("TempFilePrefixView", View.<String>asSingleton());
+ new WritePartition<>(
+ singletonTable,
+ dynamicDestinations,
+ tempFilePrefixView,
+ multiPartitionsTag,
+ singlePartitionTag))
+ .withSideInputs(tempFilePrefixView)
+ .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+ PCollection<KV<TableDestination, String>> tempTables =
+ writeTempTables(partitions.get(multiPartitionsTag), jobIdTokenView);
+ tempTables
+ // Now that the load job has happened, we want the rename to happen immediately.
+ .apply(
+ Window.<KV<TableDestination, String>>into(new GlobalWindows())
+ .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
+ .apply(WithKeys.<Void, KV<TableDestination, String>>of((Void) null))
+ .setCoder(
+ KvCoder.of(
+ VoidCoder.of(), KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
+ .apply(GroupByKey.<Void, KV<TableDestination, String>>create())
+ .apply(Values.<Iterable<KV<TableDestination, String>>>create())
+ .apply(
+ "WriteRenameTriggered",
+ ParDo.of(
+ new WriteRename(
+ bigQueryServices, jobIdTokenView, writeDisposition, createDisposition))
+ .withSideInputs(jobIdTokenView));
+ writeSinglePartition(partitions.get(singlePartitionTag), jobIdTokenView);
+ return writeResult(p);
+ }
+ // Expand the pipeline when the user has not requested periodically-triggered file writes.
+ public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> input) {
+ Pipeline p = input.getPipeline();
+ final PCollectionView<String> jobIdTokenView = createJobIdView(p);
+ final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
Window.<KV<DestinationT, TableRow>>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes());
- PCollectionView<Map<DestinationT, String>> schemasView =
- inputInGlobalWindow.apply(new CalculateSchemas<>(dynamicDestinations));
+ PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
+ (numFileShards == 0)
+ ? writeDynamicallyShardedFiles(inputInGlobalWindow, tempFilePrefixView)
+ : writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
+
+ TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
+ TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
+ new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
+
+ // This transform will look at the set of files written for each table, and if any table has
+ // too many files or bytes, will partition that table's files into multiple partitions for
+ // loading.
+ PCollectionTuple partitions =
+ results
+ .apply("ReifyResults", new ReifyAsIterable<WriteBundlesToFiles.Result<DestinationT>>())
+ .setCoder(IterableCoder.of(WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
+ .apply(
+ "WritePartitionUntriggered",
+ ParDo.of(
+ new WritePartition<>(
+ singletonTable,
+ dynamicDestinations,
+ tempFilePrefixView,
+ multiPartitionsTag,
+ singlePartitionTag))
+ .withSideInputs(tempFilePrefixView)
+ .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+ PCollection<KV<TableDestination, String>> tempTables =
+ writeTempTables(partitions.get(multiPartitionsTag), jobIdTokenView);
+
+ tempTables
+ .apply("ReifyRenameInput", new ReifyAsIterable<KV<TableDestination, String>>())
+ .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
+ .apply(
+ "WriteRenameUntriggered",
+ ParDo.of(
+ new WriteRename(
+ bigQueryServices, jobIdTokenView, writeDisposition, createDisposition))
+ .withSideInputs(jobIdTokenView));
+ writeSinglePartition(partitions.get(singlePartitionTag), jobIdTokenView);
+ return writeResult(p);
+ }
+ // Generate the base job id string.
+ private PCollectionView<String> createJobIdView(Pipeline p) {
+ // Create a singleton job ID token at execution time. This will be used as the base for all
+ // load jobs issued from this instance of the transform.
+ return p.apply("JobIdCreationRoot", Create.of((Void) null))
+ .apply(
+ "CreateJobId",
+ MapElements.via(
+ new SimpleFunction<Void, String>() {
+ @Override
+ public String apply(Void input) {
+ return BigQueryHelpers.randomUUIDString();
+ }
+ }))
+ .apply(View.<String>asSingleton());
+ }
+
+ // Generate the temporary-file prefix.
+ private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) {
+ return ((PCollection<String>) jobIdView.getPCollection())
+ .apply(
+ "GetTempFilePrefix",
+ ParDo.of(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void getTempFilePrefix(ProcessContext c) {
+ String tempLocation =
+ resolveTempLocation(
+ c.getPipelineOptions().getTempLocation(),
+ "BigQueryWriteTemp",
+ c.element());
+ LOG.info(
+ "Writing BigQuery temporary files to {} before loading them.",
+ tempLocation);
+ c.output(tempLocation);
+ }
+ }))
+ .apply("TempFilePrefixView", View.<String>asSingleton());
+ }
+
+ // Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename,
+ // file byte size, and table destination.
+ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles(
+ PCollection<KV<DestinationT, TableRow>> input, PCollectionView<String> tempFilePrefix) {
TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
- new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles"){};
+ new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") {};
TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag =
new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {};
- PCollectionTuple writeBundlesTuple = inputInGlobalWindow
- .apply("WriteBundlesToFiles",
- ParDo.of(new WriteBundlesToFiles<>(tempFilePrefix, unwrittedRecordsTag,
- maxNumWritersPerBundle, maxFileSize))
- .withSideInputs(tempFilePrefix)
- .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
+ PCollectionTuple writeBundlesTuple =
+ input.apply(
+ "WriteBundlesToFiles",
+ ParDo.of(
+ new WriteBundlesToFiles<>(
+ tempFilePrefix, unwrittedRecordsTag, maxNumWritersPerBundle, maxFileSize))
+ .withSideInputs(tempFilePrefix)
+ .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
- writeBundlesTuple.get(writtenFilesTag)
- .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+ writeBundlesTuple
+ .get(writtenFilesTag)
+ .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+ PCollection<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecords =
+ writeBundlesTuple
+ .get(unwrittedRecordsTag)
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of()));
// If the bundles contain too many output tables to be written inline to files (due to memory
// limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection.
// Group these records by key, and write the files after grouping. Since the record is grouped
// by key, we can ensure that only one file is open at a time in each bundle.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFilesGrouped =
- writeBundlesTuple
- .get(unwrittedRecordsTag)
- .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of()))
- .apply(GroupByKey.<ShardedKey<DestinationT>, TableRow>create())
- .apply(
- ParDo.of(new WriteGroupedRecordsToFiles<DestinationT>(tempFilePrefix, maxFileSize))
- .withSideInputs(tempFilePrefix))
- .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+ writeShardedRecords(unwrittenRecords, tempFilePrefix);
// PCollection of filename, file byte size, and table destination.
- PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
- PCollectionList.of(writtenFiles).and(writtenFilesGrouped)
- .apply(Flatten.<Result<DestinationT>>pCollections());
+ return PCollectionList.of(writtenFiles)
+ .and(writtenFilesGrouped)
+ .apply("FlattenFiles", Flatten.<Result<DestinationT>>pCollections())
+ .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+ }
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
- new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
- new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
+ // Writes input data to statically-sharded files. Returns a PCollection of filename,
+ // file byte size, and table destination.
+ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(
+ PCollection<KV<DestinationT, TableRow>> input, PCollectionView<String> tempFilePrefix) {
+ checkState(numFileShards > 0);
+ PCollection<KV<ShardedKey<DestinationT>, TableRow>> shardedRecords =
+ input
+ .apply(
+ "AddShard",
+ ParDo.of(
+ new DoFn<KV<DestinationT, TableRow>, KV<ShardedKey<DestinationT>, TableRow>>() {
+ int shardNumber;
- // Turn the list of files and record counts in a PCollectionView that can be used as a
- // side input.
- PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView =
- results.apply("ResultsView",
- View.<WriteBundlesToFiles.Result<DestinationT>>asIterable());
- // This transform will look at the set of files written for each table, and if any table has
- // too many files or bytes, will partition that table's files into multiple partitions for
- // loading.
- PCollection<Void> singleton = p.apply("singleton",
- Create.of((Void) null).withCoder(VoidCoder.of()));
- PCollectionTuple partitions =
- singleton.apply(
- "WritePartition",
- ParDo.of(
- new WritePartition<>(
- singletonTable,
- dynamicDestinations,
- tempFilePrefix,
- resultsView,
- multiPartitionsTag,
- singlePartitionTag))
- .withSideInputs(tempFilePrefix, resultsView)
- .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
- List<PCollectionView<?>> writeTablesSideInputs =
- Lists.newArrayList(jobIdTokenView, schemasView);
- writeTablesSideInputs.addAll(dynamicDestinations.getSideInputs());
+ @Setup
+ public void setup() {
+ shardNumber = ThreadLocalRandom.current().nextInt(numFileShards);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ DestinationT destination = c.element().getKey();
+ TableRow tableRow = c.element().getValue();
+ c.output(
+ KV.of(
+ ShardedKey.of(destination, ++shardNumber % numFileShards),
+ tableRow));
+ }
+ }))
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of()));
+
+ return writeShardedRecords(shardedRecords, tempFilePrefix);
+ }
+
+ private PCollection<Result<DestinationT>> writeShardedRecords(
+ PCollection<KV<ShardedKey<DestinationT>, TableRow>> shardedRecords,
+ PCollectionView<String> tempFilePrefix) {
+ return shardedRecords
+ .apply("GroupByDestination", GroupByKey.<ShardedKey<DestinationT>, TableRow>create())
+ .apply(
+ "WriteGroupedRecords",
+ ParDo.of(new WriteGroupedRecordsToFiles<DestinationT>(tempFilePrefix, maxFileSize))
+ .withSideInputs(tempFilePrefix))
+ .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+ }
+
+ // Take in a list of files and write them to temporary tables.
+ private PCollection<KV<TableDestination, String>> writeTempTables(
+ PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
+ PCollectionView<String> jobIdTokenView) {
+ List<PCollectionView<?>> sideInputs = Lists.<PCollectionView<?>>newArrayList(jobIdTokenView);
+ sideInputs.addAll(dynamicDestinations.getSideInputs());
Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
- KvCoder.of(
- ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
- ListCoder.of(StringUtf8Coder.of()));
+ KvCoder.of(
+ ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
+ ListCoder.of(StringUtf8Coder.of()));
// If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
// the import needs to be split into multiple partitions, and those partitions will be
// specified in multiPartitionsTag.
- PCollection<KV<TableDestination, String>> tempTables =
- partitions
- .get(multiPartitionsTag)
- .setCoder(partitionsCoder)
- // Reshuffle will distribute this among multiple workers, and also guard against
- // reexecution of the WritePartitions step once WriteTables has begun.
- .apply(
- "MultiPartitionsReshuffle",
- Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
- .apply(
- "MultiPartitionsWriteTables",
- ParDo.of(
- new WriteTables<>(
- false,
- bigQueryServices,
- jobIdTokenView,
- schemasView,
- WriteDisposition.WRITE_EMPTY,
- CreateDisposition.CREATE_IF_NEEDED,
- dynamicDestinations))
- .withSideInputs(writeTablesSideInputs));
-
- // This view maps each final table destination to the set of temporary partitioned tables
- // the PCollection was loaded into.
- PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
- tempTables.apply("TempTablesView", View.<TableDestination, String>asMultimap());
-
- singleton.apply(
- "WriteRename",
- ParDo.of(
- new WriteRename(
- bigQueryServices,
- jobIdTokenView,
- writeDisposition,
- createDisposition,
- tempTablesView))
- .withSideInputs(tempTablesView, jobIdTokenView));
+ return input
+ .setCoder(partitionsCoder)
+ // Reshuffle will distribute this among multiple workers, and also guard against
+ // reexecution of the WritePartitions step once WriteTables has begun.
+ .apply("MultiPartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
+ .apply(
+ "MultiPartitionsWriteTables",
+ ParDo.of(
+ new WriteTables<>(
+ false,
+ bigQueryServices,
+ jobIdTokenView,
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED,
+ dynamicDestinations))
+ .withSideInputs(sideInputs));
+ }
+ // In the case where the files fit into a single load job, there's no need to write temporary
+ // tables and rename. We can load these files directly into the target BigQuery table.
+ void writeSinglePartition(
+ PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
+ PCollectionView<String> jobIdTokenView) {
+ List<PCollectionView<?>> sideInputs = Lists.<PCollectionView<?>>newArrayList(jobIdTokenView);
+ sideInputs.addAll(dynamicDestinations.getSideInputs());
+ Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
+ KvCoder.of(
+ ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
+ ListCoder.of(StringUtf8Coder.of()));
// Write single partition to final table
- partitions
- .get(singlePartitionTag)
+ input
.setCoder(partitionsCoder)
// Reshuffle will distribute this among multiple workers, and also guard against
// reexecution of the WritePartitions step once WriteTables has begun.
- .apply(
- "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
+ .apply("SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
.apply(
"SinglePartitionWriteTables",
ParDo.of(
@@ -336,14 +515,20 @@ class BatchLoads<DestinationT>
true,
bigQueryServices,
jobIdTokenView,
- schemasView,
writeDisposition,
createDisposition,
dynamicDestinations))
- .withSideInputs(writeTablesSideInputs));
+ .withSideInputs(sideInputs));
+ }
+ private WriteResult writeResult(Pipeline p) {
PCollection<TableRow> empty =
p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class)));
- return WriteResult.in(input.getPipeline(), new TupleTag<TableRow>("failedInserts"), empty);
+ return WriteResult.in(p, new TupleTag<TableRow>("failedInserts"), empty);
+ }
+
+ @Override
+ public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
+ return (triggeringFrequency != null) ? expandTriggered(input) : expandUntriggered(input);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 6edbd06..feb085d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -92,19 +92,20 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * {@link PTransform}s for reading and writing
- * <a href="https://developers.google.com/bigquery/">BigQuery</a> tables.
+ * {@link PTransform}s for reading and writing <a
+ * href="https://developers.google.com/bigquery/">BigQuery</a> tables.
*
* <h3>Table References</h3>
*
* <p>A fully-qualified BigQuery table name consists of three components:
+ *
* <ul>
- * <li>{@code projectId}: the Cloud project id (defaults to
- * {@link GcpOptions#getProject()}).
+ * <li>{@code projectId}: the Cloud project id (defaults to {@link GcpOptions#getProject()}).
* <li>{@code datasetId}: the BigQuery dataset id, unique within a project.
* <li>{@code tableId}: a table id, unique within a dataset.
* </ul>
@@ -122,8 +123,8 @@ import org.slf4j.LoggerFactory;
*
* <h3>Reading</h3>
*
- * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation.
- * This produces a {@link PCollection} of {@link TableRow TableRows} as output:
+ * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation. This produces a
+ * {@link PCollection} of {@link TableRow TableRows} as output:
*
* <pre>{@code
* PCollection<TableRow> weatherData = pipeline.apply(
@@ -146,12 +147,11 @@ import org.slf4j.LoggerFactory;
*
* <h3>Writing</h3>
*
- * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation.
- * This consumes either a {@link PCollection} of {@link TableRow TableRows} as input when using
- * {@link BigQueryIO#writeTableRows()} or of a user-defined type when using
- * {@link BigQueryIO#write()}. When using a user-defined type, a function must be provided to
- * turn this type into a {@link TableRow} using
- * {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)}.
+ * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes
+ * either a {@link PCollection} of {@link TableRow TableRows} as input when using {@link
+ * BigQueryIO#writeTableRows()} or of a user-defined type when using {@link BigQueryIO#write()}.
+ * When using a user-defined type, a function must be provided to turn this type into a {@link
+ * TableRow} using {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)}.
*
* <pre>{@code
* PCollection<TableRow> quotes = ...
@@ -216,8 +216,8 @@ import org.slf4j.LoggerFactory;
* can also be useful when writing to a single table, as it allows a previous stage to calculate the
* schema (possibly based on the full collection of records being written to BigQuery).
*
- * <p>For the most general form of dynamic table destinations and schemas, look at
- * {@link BigQueryIO.Write#to(DynamicDestinations)}.
+ * <p>For the most general form of dynamic table destinations and schemas, look at {@link
+ * BigQueryIO.Write#to(DynamicDestinations)}.
*
* <h3>Permissions</h3>
*
@@ -227,6 +227,15 @@ import org.slf4j.LoggerFactory;
*
* <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
* </a> for security and permission related information specific to BigQuery.
+ *
+ * <h3>Insertion Method</h3>
+ *
+ * {@link BigQueryIO.Write} supports two methods of inserting data into BigQuery specified using
+ * {@link BigQueryIO.Write#withMethod}. If no method is supplied, then a default method will be
+ * chosen based on the input PCollection. See {@link BigQueryIO.Write.Method} for more information
+ * about the methods. The different insertion methods provide different tradeoffs of cost, quota,
+ * and data consistency; please see BigQuery documentation for more information about these
+ * tradeoffs.
*/
public class BigQueryIO {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
@@ -757,6 +766,8 @@ public class BigQueryIO {
.setBigQueryServices(new BigQueryServicesImpl())
.setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
+ .setNumFileShards(0)
+ .setMethod(Write.Method.DEFAULT)
.build();
}
@@ -771,6 +782,41 @@ public class BigQueryIO {
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> {
+ /** Determines the method used to insert data in BigQuery. */
+ public enum Method {
+ /**
+ * The default behavior if no method is explicitly set. If the input is bounded, then file
+ * loads will be used. If the input is unbounded, then streaming inserts will be used.
+ */
+ DEFAULT,
+
+ /**
+ * Use BigQuery load jobs to insert data. Records will first be written to files, and these
+ * files will be loaded into BigQuery. This is the default method when the input is bounded.
+ * This method can be chosen for unbounded inputs as well, as long as a triggering frequency
+ * is also set using {@link #withTriggeringFrequency}. BigQuery has daily quotas on the number
+ * of load jobs allowed per day, so be careful not to set the triggering frequency too
+ * frequent. For more information, see <a
+ * href="https://cloud.google.com/bigquery/docs/loading-data-cloud-storage">Loading Data from
+ * Cloud Storage</a>.
+ */
+ FILE_LOADS,
+
+ /**
+ * Use the BigQuery streaming insert API to insert data. This provides the lowest-latency
+ * insert path into BigQuery, and therefore is the default method when the input is unbounded.
+ * BigQuery will make a strong effort to ensure no duplicates when using this path, however
+ * there are some scenarios in which BigQuery is unable to make this guarantee (see
+ * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over
+ * the output table to periodically clean these rare duplicates. Alternatively, using the
+ * {@link #FILE_LOADS} insert method does guarantee no duplicates, though the latency for the
+ * insert into BigQuery will be much higher. For more information, see <a
+ * href="https://cloud.google.com/bigquery/streaming-data-into-bigquery">Streaming Data into
+ * BigQuery</a>.
+ */
+ STREAMING_INSERTS
+ }
+
@Nullable abstract ValueProvider<String> getJsonTableRef();
@Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination>
getTableFunction();
@@ -787,6 +833,14 @@ public class BigQueryIO {
abstract BigQueryServices getBigQueryServices();
@Nullable abstract Integer getMaxFilesPerBundle();
@Nullable abstract Long getMaxFileSize();
+
+ abstract int getNumFileShards();
+
+ @Nullable
+ abstract Duration getTriggeringFrequency();
+
+ abstract Method getMethod();
+
@Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy();
abstract Builder<T> toBuilder();
@@ -807,6 +861,13 @@ public class BigQueryIO {
abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle);
abstract Builder<T> setMaxFileSize(Long maxFileSize);
+
+ abstract Builder<T> setNumFileShards(int numFileShards);
+
+ abstract Builder<T> setTriggeringFrequency(Duration triggeringFrequency);
+
+ abstract Builder<T> setMethod(Method method);
+
abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy);
abstract Write<T> build();
@@ -992,6 +1053,40 @@ public class BigQueryIO {
return toBuilder().setValidate(false).build();
}
+ /**
+ * Choose the method used to write data to BigQuery. See the Javadoc on {@link Method} for
+ * information and restrictions of the different methods.
+ */
+ public Write<T> withMethod(Method method) {
+ return toBuilder().setMethod(method).build();
+ }
+
+ /**
+ * Choose the frequency at which file writes are triggered.
+ *
+ * <p>This is only applicable when the write method is set to {@link Method#FILE_LOADS}, and
+ * only when writing a bounded {@link PCollection}.
+ *
+ * <p>Every triggeringFrequency duration, a BigQuery load job will be generated for all the data
+ * written since the last load job. BigQuery has limits on how many load jobs can be triggered
+ * per day, so be careful not to set this duration too low, or you may exceed daily quota. Often
+ * this is set to 5 or 10 minutes to ensure that the project stays well under the BigQuery
+ * quota. See <a href="https://cloud.google.com/bigquery/quota-policy">Quota Policy</a> for more
+ * information about BigQuery quotas.
+ */
+ public Write<T> withTriggeringFrequency(Duration triggeringFrequency) {
+ return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
+ }
+
+ /**
+ * Control how many file shards are written when using BigQuery load jobs. Applicable only when
+ * also setting {@link #withTriggeringFrequency}. The default value is 1000.
+ */
+ @Experimental
+ public Write<T> withNumFileShards(int numFileShards) {
+ return toBuilder().setNumFileShards(numFileShards).build();
+ }
+
@VisibleForTesting
Write<T> withTestServices(BigQueryServices testServices) {
return toBuilder().setBigQueryServices(testServices).build();
@@ -1029,6 +1124,17 @@ public class BigQueryIO {
}
}
+ private Method resolveMethod(PCollection<T> input) {
+ if (getMethod() != Method.DEFAULT) {
+ return getMethod();
+ }
+ // By default, when writing an Unbounded PCollection, we use StreamingInserts and
+ // BigQuery's streaming import API.
+ return (input.isBounded() == IsBounded.UNBOUNDED)
+ ? Method.STREAMING_INSERTS
+ : Method.FILE_LOADS;
+ }
+
@Override
public WriteResult expand(PCollection<T> input) {
// We must have a destination to write to!
@@ -1048,6 +1154,7 @@ public class BigQueryIO {
|| getSchemaFromView() != null,
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+
List<?> allToArgs = Lists.newArrayList(getJsonTableRef(), getTableFunction(),
getDynamicDestinations());
checkArgument(1
@@ -1061,7 +1168,21 @@ public class BigQueryIO {
"No more than one of jsonSchema, schemaFromView, or dynamicDestinations may "
+ "be set");
-
+ Method method = resolveMethod(input);
+ if (input.isBounded() == IsBounded.UNBOUNDED && method == Method.FILE_LOADS) {
+ checkArgument(
+ getTriggeringFrequency() != null,
+ "When writing an unbounded PCollection via FILE_LOADS, "
+ + "triggering frequency must be specified");
+ } else {
+ checkArgument(
+ getTriggeringFrequency() == null && getNumFileShards() == 0,
+ "Triggering frequency or number of file shards can be specified only when writing "
+ + "an unbounded PCollection via FILE_LOADS, but: the collection was %s "
+ + "and the method was %s",
+ input.isBounded(),
+ method);
+ }
DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
if (dynamicDestinations == null) {
if (getJsonTableRef() != null) {
@@ -1069,17 +1190,20 @@ public class BigQueryIO {
DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(
getJsonTableRef(), getTableDescription());
} else if (getTableFunction() != null) {
- dynamicDestinations = new TableFunctionDestinations(getTableFunction());
+ dynamicDestinations = new TableFunctionDestinations<>(getTableFunction());
}
// Wrap with a DynamicDestinations class that will provide a schema. There might be no
// schema provided if the create disposition is CREATE_NEVER.
if (getJsonSchema() != null) {
dynamicDestinations =
- new ConstantSchemaDestinations(dynamicDestinations, getJsonSchema());
+ new ConstantSchemaDestinations<>(
+ (DynamicDestinations<T, TableDestination>) dynamicDestinations, getJsonSchema());
} else if (getSchemaFromView() != null) {
dynamicDestinations =
- new SchemaFromViewDestinations(dynamicDestinations, getSchemaFromView());
+ new SchemaFromViewDestinations<>(
+ (DynamicDestinations<T, TableDestination>) dynamicDestinations,
+ getSchemaFromView());
}
}
return expandTyped(input, dynamicDestinations);
@@ -1100,9 +1224,9 @@ public class BigQueryIO {
.apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction()))
.setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of()));
- // When writing an Unbounded PCollection, we use StreamingInserts and BigQuery's streaming
- // import API.
- if (input.isBounded() == IsBounded.UNBOUNDED) {
+ Method method = resolveMethod(input);
+
+ if (method == Method.STREAMING_INSERTS) {
checkArgument(
getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
"WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
@@ -1129,6 +1253,8 @@ public class BigQueryIO {
if (getMaxFileSize() != null) {
batchLoads.setMaxFileSize(getMaxFileSize());
}
+ batchLoads.setTriggeringFrequency(getTriggeringFrequency());
+ batchLoads.setNumFileShards(getNumFileShards());
return rowsWithDestination.apply(batchLoads);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java
new file mode 100644
index 0000000..18a359c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * This transforms turns a side input into a singleton PCollection that can be used as the main
+ * input for another transform.
+ */
+public class ReifyAsIterable<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>> {
+ @Override
+ public PCollection<Iterable<T>> expand(PCollection<T> input) {
+ final PCollectionView<Iterable<T>> view = input.apply(View.<T>asIterable());
+ return input
+ .getPipeline()
+ .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
+ .apply(
+ ParDo.of(
+ new DoFn<Void, Iterable<T>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.sideInput(view));
+ }
+ })
+ .withSideInputs(view));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index e1ed746..e337f94 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -66,9 +66,10 @@ class WriteBundlesToFiles<DestinationT>
private transient Map<DestinationT, TableRowWriter> writers;
private transient Map<DestinationT, BoundedWindow> writerWindows;
private final PCollectionView<String> tempFilePrefixView;
- private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag;
+ private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecordsTag;
private int maxNumWritersPerBundle;
private long maxFileSize;
+ private int spilledShardNumber;
/**
* The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
@@ -133,11 +134,11 @@ class WriteBundlesToFiles<DestinationT>
WriteBundlesToFiles(
PCollectionView<String> tempFilePrefixView,
- TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag,
+ TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecordsTag,
int maxNumWritersPerBundle,
long maxFileSize) {
this.tempFilePrefixView = tempFilePrefixView;
- this.unwrittedRecordsTag = unwrittedRecordsTag;
+ this.unwrittenRecordsTag = unwrittenRecordsTag;
this.maxNumWritersPerBundle = maxNumWritersPerBundle;
this.maxFileSize = maxFileSize;
}
@@ -148,6 +149,7 @@ class WriteBundlesToFiles<DestinationT>
// bundles.
this.writers = Maps.newHashMap();
this.writerWindows = Maps.newHashMap();
+ this.spilledShardNumber = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
}
TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePrefix,
@@ -174,9 +176,10 @@ class WriteBundlesToFiles<DestinationT>
} else {
// This means that we already had too many writers open in this bundle. "spill" this record
// into the output. It will be grouped and written to a file in a subsequent stage.
- c.output(unwrittedRecordsTag,
- KV.of(ShardedKey.of(destination,
- ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR)),
+ c.output(
+ unwrittenRecordsTag,
+ KV.of(
+ ShardedKey.of(destination, (++spilledShardNumber) % SPILLED_RECORD_SHARDING_FACTOR),
c.element().getValue()));
return;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 451d1bd..934f1ae 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
@@ -34,12 +35,13 @@ import org.apache.beam.sdk.values.TupleTag;
* tablespec and the list of files corresponding to each partition of that table.
*/
class WritePartition<DestinationT>
- extends DoFn<Void, KV<ShardedKey<DestinationT>, List<String>>> {
+ extends DoFn<
+ Iterable<WriteBundlesToFiles.Result<DestinationT>>,
+ KV<ShardedKey<DestinationT>, List<String>>> {
private final boolean singletonTable;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private final PCollectionView<String> tempFilePrefix;
- private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results;
- private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag;
+ @Nullable private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag;
private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag;
private static class PartitionData {
@@ -104,12 +106,10 @@ class WritePartition<DestinationT>
boolean singletonTable,
DynamicDestinations<?, DestinationT> dynamicDestinations,
PCollectionView<String> tempFilePrefix,
- PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results,
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag,
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) {
this.singletonTable = singletonTable;
this.dynamicDestinations = dynamicDestinations;
- this.results = results;
this.tempFilePrefix = tempFilePrefix;
this.multiPartitionsTag = multiPartitionsTag;
this.singlePartitionTag = singlePartitionTag;
@@ -117,8 +117,7 @@ class WritePartition<DestinationT>
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- List<WriteBundlesToFiles.Result<DestinationT>> results =
- Lists.newArrayList(c.sideInput(this.results));
+ List<WriteBundlesToFiles.Result<DestinationT>> results = Lists.newArrayList(c.element());
// If there are no elements to write _and_ the user specified a constant output table, then
// generate an empty table of that name.
http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index f641b32..eb1da5f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -22,9 +22,11 @@ import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.TableReference;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -35,74 +37,85 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Copies temporary tables to destination table.
+ * Copies temporary tables to destination table. The input element is an {@link Iterable} that
+ * provides the list of all temporary tables created for a given {@link TableDestination}.
*/
-class WriteRename extends DoFn<Void, Void> {
+class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);
private final BigQueryServices bqServices;
private final PCollectionView<String> jobIdToken;
- private final WriteDisposition writeDisposition;
- private final CreateDisposition createDisposition;
- // Map from final destination to a list of temporary tables that need to be copied into it.
- private final PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView;
+ // In the triggered scenario, the user-supplied create and write dispositions only apply to the
+ // first trigger pane, as that's when when the table is created. Subsequent loads should always
+ // append to the table, and so use CREATE_NEVER and WRITE_APPEND dispositions respectively.
+ private final WriteDisposition firstPaneWriteDisposition;
+ private final CreateDisposition firstPaneCreateDisposition;
public WriteRename(
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView) {
+ CreateDisposition createDisposition) {
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
- this.writeDisposition = writeDisposition;
- this.createDisposition = createDisposition;
- this.tempTablesView = tempTablesView;
+ this.firstPaneWriteDisposition = writeDisposition;
+ this.firstPaneCreateDisposition = createDisposition;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- Map<TableDestination, Iterable<String>> tempTablesMap =
- Maps.newHashMap(c.sideInput(tempTablesView));
-
- // Process each destination table.
- for (Map.Entry<TableDestination, Iterable<String>> entry : tempTablesMap.entrySet()) {
- TableDestination finalTableDestination = entry.getKey();
- List<String> tempTablesJson = Lists.newArrayList(entry.getValue());
- // Do not copy if no temp tables are provided
- if (tempTablesJson.size() == 0) {
- return;
- }
+ Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
+ for (KV<TableDestination, String> entry : c.element()) {
+ tempTables.put(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) {
+ // Process each destination table.
+ writeRename(entry.getKey(), entry.getValue(), c);
+ }
+ }
- List<TableReference> tempTables = Lists.newArrayList();
- for (String table : tempTablesJson) {
- tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
- }
+ private void writeRename(
+ TableDestination finalTableDestination, Iterable<String> tempTableNames, ProcessContext c)
+ throws Exception {
+ WriteDisposition writeDisposition =
+ (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
+ CreateDisposition createDisposition =
+ (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
+ List<String> tempTablesJson = Lists.newArrayList(tempTableNames);
+ // Do not copy if no temp tables are provided
+ if (tempTablesJson.size() == 0) {
+ return;
+ }
- // Make sure each destination table gets a unique job id.
- String jobIdPrefix = BigQueryHelpers.createJobId(
- c.sideInput(jobIdToken), finalTableDestination, -1);
-
- copy(
- bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
- bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
- jobIdPrefix,
- finalTableDestination.getTableReference(),
- tempTables,
- writeDisposition,
- createDisposition,
- finalTableDestination.getTableDescription());
-
- DatasetService tableService =
- bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
- removeTemporaryTables(tableService, tempTables);
+ List<TableReference> tempTables = Lists.newArrayList();
+ for (String table : tempTablesJson) {
+ tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
}
+
+ // Make sure each destination table gets a unique job id.
+ String jobIdPrefix =
+ BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1);
+
+ copy(
+ bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+ bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+ jobIdPrefix,
+ finalTableDestination.getTableReference(),
+ tempTables,
+ writeDisposition,
+ createDisposition,
+ finalTableDestination.getTableDescription());
+
+ DatasetService tableService =
+ bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+ removeTemporaryTables(tableService, tempTables);
}
private void copy(
@@ -174,9 +187,11 @@ class WriteRename extends DoFn<Void, Void> {
super.populateDisplayData(builder);
builder
- .add(DisplayData.item("writeDisposition", writeDisposition.toString())
- .withLabel("Write Disposition"))
- .add(DisplayData.item("createDisposition", createDisposition.toString())
- .withLabel("Create Disposition"));
+ .add(
+ DisplayData.item("firstPaneWriteDisposition", firstPaneWriteDisposition.toString())
+ .withLabel("Write Disposition"))
+ .add(
+ DisplayData.item("firstPaneCreateDisposition", firstPaneCreateDisposition.toString())
+ .withLabel("Create Disposition"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 9ed2916..24911a7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -65,35 +66,48 @@ class WriteTables<DestinationT>
private final boolean singlePartition;
private final BigQueryServices bqServices;
private final PCollectionView<String> jobIdToken;
- private final PCollectionView<Map<DestinationT, String>> schemasView;
- private final WriteDisposition writeDisposition;
- private final CreateDisposition createDisposition;
+ private final WriteDisposition firstPaneWriteDisposition;
+ private final CreateDisposition firstPaneCreateDisposition;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+ private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
public WriteTables(
boolean singlePartition,
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
- PCollectionView<Map<DestinationT, String>> schemasView,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
DynamicDestinations<?, DestinationT> dynamicDestinations) {
this.singlePartition = singlePartition;
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
- this.schemasView = schemasView;
- this.writeDisposition = writeDisposition;
- this.createDisposition = createDisposition;
+ this.firstPaneWriteDisposition = writeDisposition;
+ this.firstPaneCreateDisposition = createDisposition;
this.dynamicDestinations = dynamicDestinations;
}
+ @StartBundle
+ public void startBundle(StartBundleContext c) {
+ // Clear the map on each bundle so we can notice side-input updates.
+ // (alternative is to use a cache with a TTL).
+ jsonSchemas.clear();
+ }
+
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
DestinationT destination = c.element().getKey().getKey();
- TableSchema tableSchema =
- BigQueryHelpers.fromJsonString(
- c.sideInput(schemasView).get(destination), TableSchema.class);
+ TableSchema tableSchema;
+ String jsonSchema = jsonSchemas.get(destination);
+ if (jsonSchema != null) {
+ tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+ } else {
+ tableSchema = dynamicDestinations.getSchema(destination);
+ if (tableSchema != null) {
+ jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema));
+ }
+ }
+
TableDestination tableDestination = dynamicDestinations.getTable(destination);
TableReference tableReference = tableDestination.getTableReference();
if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
@@ -112,6 +126,10 @@ class WriteTables<DestinationT>
tableReference.setTableId(jobIdPrefix);
}
+ WriteDisposition writeDisposition =
+ (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
+ CreateDisposition createDisposition =
+ (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
load(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 8db4e94..3d53b7e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -47,12 +47,14 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
@@ -80,7 +82,6 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -91,6 +92,7 @@ import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
@@ -105,6 +107,9 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
@@ -130,20 +135,19 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
@@ -540,65 +544,73 @@ public class BigQueryIOTest implements Serializable {
if (streaming) {
users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
}
- users.apply("WriteBigQuery", BigQueryIO.<String>write()
+ users.apply(
+ "WriteBigQuery",
+ BigQueryIO.<String>write()
.withTestServices(fakeBqServices)
.withMaxFilesPerBundle(5)
.withMaxFileSize(10)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withFormatFunction(new SerializableFunction<String, TableRow>() {
- @Override
- public TableRow apply(String user) {
- Matcher matcher = userPattern.matcher(user);
- if (matcher.matches()) {
- return new TableRow().set("name", matcher.group(1))
- .set("id", Integer.valueOf(matcher.group(2)));
- }
- throw new RuntimeException("Unmatching element " + user);
- }
- })
- .to(new StringIntegerDestinations() {
- @Override
- public Integer getDestination(ValueInSingleWindow<String> element) {
- assertThat(element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class));
- Matcher matcher = userPattern.matcher(element.getValue());
- if (matcher.matches()) {
- // Since we name tables by userid, we can simply store an Integer to represent
- // a table.
- return Integer.valueOf(matcher.group(2));
- }
- throw new RuntimeException("Unmatching destination " + element.getValue());
- }
+ .withFormatFunction(
+ new SerializableFunction<String, TableRow>() {
+ @Override
+ public TableRow apply(String user) {
+ Matcher matcher = userPattern.matcher(user);
+ if (matcher.matches()) {
+ return new TableRow()
+ .set("name", matcher.group(1))
+ .set("id", Integer.valueOf(matcher.group(2)));
+ }
+ throw new RuntimeException("Unmatching element " + user);
+ }
+ })
+ .to(
+ new StringIntegerDestinations() {
+ @Override
+ public Integer getDestination(ValueInSingleWindow<String> element) {
+ assertThat(
+ element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class));
+ Matcher matcher = userPattern.matcher(element.getValue());
+ if (matcher.matches()) {
+ // Since we name tables by userid, we can simply store an Integer to represent
+ // a table.
+ return Integer.valueOf(matcher.group(2));
+ }
+ throw new RuntimeException("Unmatching destination " + element.getValue());
+ }
- @Override
- public TableDestination getTable(Integer userId) {
- verifySideInputs();
- // Each user in it's own table.
- return new TableDestination("dataset-id.userid-" + userId,
- "table for userid " + userId);
- }
+ @Override
+ public TableDestination getTable(Integer userId) {
+ verifySideInputs();
+ // Each user in it's own table.
+ return new TableDestination(
+ "dataset-id.userid-" + userId, "table for userid " + userId);
+ }
- @Override
- public TableSchema getSchema(Integer userId) {
- verifySideInputs();
- return new TableSchema().setFields(
- ImmutableList.of(
- new TableFieldSchema().setName("name").setType("STRING"),
- new TableFieldSchema().setName("id").setType("INTEGER")));
- }
+ @Override
+ public TableSchema getSchema(Integer userId) {
+ verifySideInputs();
+ return new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("id").setType("INTEGER")));
+ }
- @Override
- public List<PCollectionView<?>> getSideInputs() {
- return ImmutableList.of(sideInput1, sideInput2);
- }
+ @Override
+ public List<PCollectionView<?>> getSideInputs() {
+ return ImmutableList.of(sideInput1, sideInput2);
+ }
- private void verifySideInputs() {
- assertThat(sideInput(sideInput1), containsInAnyOrder("a", "b", "c"));
- Map<String, String> mapSideInput = sideInput(sideInput2);
- assertEquals(3, mapSideInput.size());
- assertThat(mapSideInput,
- allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c")));
- }
- })
+ private void verifySideInputs() {
+ assertThat(sideInput(sideInput1), containsInAnyOrder("a", "b", "c"));
+ Map<String, String> mapSideInput = sideInput(sideInput2);
+ assertEquals(3, mapSideInput.size());
+ assertThat(
+ mapSideInput,
+ allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c")));
+ }
+ })
.withoutValidation());
p.run();
@@ -626,6 +638,59 @@ public class BigQueryIOTest implements Serializable {
}
@Test
+ @Category({ValidatesRunner.class, UsesTestStream.class})
+ public void testTriggeredFileLoads() throws Exception {
+ BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+ bqOptions.setProject("project-id");
+ bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+ FakeDatasetService datasetService = new FakeDatasetService();
+ FakeBigQueryServices fakeBqServices =
+ new FakeBigQueryServices()
+ .withJobService(new FakeJobService())
+ .withDatasetService(datasetService);
+
+ List<TableRow> elements = Lists.newArrayList();
+ for (int i = 0; i < 30; ++i) {
+ elements.add(new TableRow().set("number", i));
+ }
+
+ datasetService.createDataset("project-id", "dataset-id", "", "");
+ TestStream<TableRow> testStream =
+ TestStream.create(TableRowJsonCoder.of())
+ .addElements(
+ elements.get(0), Iterables.toArray(elements.subList(1, 10), TableRow.class))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+ .addElements(
+ elements.get(10), Iterables.toArray(elements.subList(11, 20), TableRow.class))
+ .advanceProcessingTime(Duration.standardMinutes(1))
+ .addElements(
+ elements.get(20), Iterables.toArray(elements.subList(21, 30), TableRow.class))
+ .advanceWatermarkToInfinity();
+
+ Pipeline p = TestPipeline.create(bqOptions);
+ p.apply(testStream)
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to("project-id:dataset-id.table-id")
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("number").setType("INTEGER"))))
+ .withTestServices(fakeBqServices)
+ .withTriggeringFrequency(Duration.standardSeconds(30))
+ .withNumFileShards(2)
+ .withMethod(Method.FILE_LOADS)
+ .withoutValidation());
+ p.run();
+
+ assertThat(
+ datasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+ }
+
+ @Test
public void testRetryPolicy() throws Exception {
BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
bqOptions.setProject("project-id");
@@ -1796,25 +1861,24 @@ public class BigQueryIOTest implements Serializable {
TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
- PCollectionView<Iterable<WriteBundlesToFiles.Result<TableDestination>>> resultsView =
- p.apply(
- Create.of(files)
- .withCoder(WriteBundlesToFiles.ResultCoder.of(TableDestinationCoder.of())))
- .apply(View.<WriteBundlesToFiles.Result<TableDestination>>asIterable());
-
String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath();
PCollectionView<String> tempFilePrefixView =
p.apply(Create.of(tempFilePrefix)).apply(View.<String>asSingleton());
WritePartition<TableDestination> writePartition =
- new WritePartition<>(isSingleton, dynamicDestinations, tempFilePrefixView,
- resultsView, multiPartitionsTag, singlePartitionTag);
-
- DoFnTester<Void, KV<ShardedKey<TableDestination>, List<String>>> tester =
- DoFnTester.of(writePartition);
- tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files);
+ new WritePartition<>(
+ isSingleton,
+ dynamicDestinations,
+ tempFilePrefixView,
+ multiPartitionsTag,
+ singlePartitionTag);
+
+ DoFnTester<
+ Iterable<WriteBundlesToFiles.Result<TableDestination>>,
+ KV<ShardedKey<TableDestination>, List<String>>>
+ tester = DoFnTester.of(writePartition);
tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix);
- tester.processElement(null);
+ tester.processElement(files);
List<KV<ShardedKey<TableDestination>, List<String>>> partitions;
if (expectedNumPartitionsPerTable > 1) {
@@ -1864,7 +1928,7 @@ public class BigQueryIOTest implements Serializable {
@Override
public TableSchema getSchema(String destination) {
- throw new UnsupportedOperationException("getSchema not expected in this test.");
+ return null;
}
}
@@ -1926,16 +1990,11 @@ public class BigQueryIOTest implements Serializable {
.apply("CreateJobId", Create.of("jobId"))
.apply(View.<String>asSingleton());
- PCollectionView<Map<String, String>> schemaMapView =
- p.apply("CreateEmptySchema",
- Create.empty(new TypeDescriptor<KV<String, String>>() {}))
- .apply(View.<String, String>asMap());
WriteTables<String> writeTables =
new WriteTables<>(
false,
fakeBqServices,
jobIdTokenView,
- schemaMapView,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
new IdentityDynamicTables());
@@ -1943,7 +2002,6 @@ public class BigQueryIOTest implements Serializable {
DoFnTester<KV<ShardedKey<String>, List<String>>,
KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
- tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, ImmutableMap.<String, String>of());
tester.getPipelineOptions().setTempLocation("tempLocation");
for (KV<ShardedKey<String>, List<String>> partition : partitions) {
tester.processElement(partition);
@@ -1999,21 +2057,14 @@ public class BigQueryIOTest implements Serializable {
final int numTempTablesPerFinalTable = 3;
final int numRecordsPerTempTable = 10;
- Map<TableDestination, List<TableRow>> expectedRowsPerTable = Maps.newHashMap();
+ Multimap<TableDestination, TableRow> expectedRowsPerTable = ArrayListMultimap.create();
String jobIdToken = "jobIdToken";
- Map<TableDestination, Iterable<String>> tempTables = Maps.newHashMap();
+ Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
+ List<KV<TableDestination, String>> tempTablesElement = Lists.newArrayList();
for (int i = 0; i < numFinalTables; ++i) {
String tableName = "project-id:dataset-id.table_" + i;
TableDestination tableDestination = new TableDestination(
tableName, "table_" + i + "_desc");
- List<String> tables = Lists.newArrayList();
- tempTables.put(tableDestination, tables);
-
- List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
- if (expectedRows == null) {
- expectedRows = Lists.newArrayList();
- expectedRowsPerTable.put(tableDestination, expectedRows);
- }
for (int j = 0; i < numTempTablesPerFinalTable; ++i) {
TableReference tempTable = new TableReference()
.setProjectId("project-id")
@@ -2026,56 +2077,36 @@ public class BigQueryIOTest implements Serializable {
rows.add(new TableRow().set("number", j * numTempTablesPerFinalTable + k));
}
datasetService.insertAll(tempTable, rows, null);
- expectedRows.addAll(rows);
- tables.add(BigQueryHelpers.toJsonString(tempTable));
+ expectedRowsPerTable.putAll(tableDestination, rows);
+ String tableJson = BigQueryHelpers.toJsonString(tempTable);
+ tempTables.put(tableDestination, tableJson);
+ tempTablesElement.add(KV.of(tableDestination, tableJson));
}
}
- PCollection<KV<TableDestination, String>> tempTablesPCollection =
- p.apply(Create.of(tempTables)
- .withCoder(KvCoder.of(TableDestinationCoder.of(),
- IterableCoder.of(StringUtf8Coder.of()))))
- .apply(ParDo.of(new DoFn<KV<TableDestination, Iterable<String>>,
- KV<TableDestination, String>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- TableDestination tableDestination = c.element().getKey();
- for (String tempTable : c.element().getValue()) {
- c.output(KV.of(tableDestination, tempTable));
- }
- }
- }));
-
- PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
- PCollectionViews.multimapView(
- tempTablesPCollection,
- WindowingStrategy.globalDefault(),
- KvCoder.of(TableDestinationCoder.of(),
- StringUtf8Coder.of()));
PCollectionView<String> jobIdTokenView = p
.apply("CreateJobId", Create.of("jobId"))
.apply(View.<String>asSingleton());
- WriteRename writeRename = new WriteRename(
- fakeBqServices,
- jobIdTokenView,
- WriteDisposition.WRITE_EMPTY,
- CreateDisposition.CREATE_IF_NEEDED,
- tempTablesView);
+ WriteRename writeRename =
+ new WriteRename(
+ fakeBqServices,
+ jobIdTokenView,
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED);
- DoFnTester<Void, Void> tester = DoFnTester.of(writeRename);
- tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
+ DoFnTester<Iterable<KV<TableDestination, String>>, Void> tester = DoFnTester.of(writeRename);
tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
- tester.processElement(null);
+ tester.processElement(tempTablesElement);
- for (Map.Entry<TableDestination, Iterable<String>> entry : tempTables.entrySet()) {
+ for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) {
TableDestination tableDestination = entry.getKey();
TableReference tableReference = tableDestination.getTableReference();
Table table = checkNotNull(datasetService.getTable(tableReference));
assertEquals(tableReference.getTableId() + "_desc", tableDestination.getTableDescription());
- List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
+ Collection<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
assertThat(datasetService.getAllRows(tableReference.getProjectId(),
tableReference.getDatasetId(), tableReference.getTableId()),
containsInAnyOrder(Iterables.toArray(expectedRows, TableRow.class)));
[2/2] beam git commit: This closes #3662: [BEAM-2700] Support load
jobs in streaming
Posted by jk...@apache.org.
This closes #3662: [BEAM-2700] Support load jobs in streaming
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7e8f886
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7e8f886
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7e8f886
Branch: refs/heads/master
Commit: f7e8f886c91ea9d0b51e00331eeb4484e2f6e000
Parents: 0f8e8dd 075d4d4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Aug 14 14:32:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Aug 14 14:32:14 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 447 +++++++++++++------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 168 ++++++-
.../sdk/io/gcp/bigquery/ReifyAsIterable.java | 51 +++
.../io/gcp/bigquery/WriteBundlesToFiles.java | 15 +-
.../sdk/io/gcp/bigquery/WritePartition.java | 13 +-
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 111 +++--
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 38 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 269 ++++++-----
8 files changed, 770 insertions(+), 342 deletions(-)
----------------------------------------------------------------------