You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:49 UTC
[15/50] [abbrv] beam git commit: Separate streaming writes into two
pluggable components - CreateTables,
and StreamingWriteTables. Also address many code review comments. Also merge
with master.
Separate streaming writes into two pluggable components - CreateTables, and StreamingWriteTables.
Also address many code review comments.
Also merge with master.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d13061c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d13061c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d13061c
Branch: refs/heads/DSL_SQL
Commit: 7d13061cc36466c502bbc1f61d391743dd3739af
Parents: b486137
Author: Reuven Lax <re...@google.com>
Authored: Sun Apr 2 21:39:50 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 176 ++++++++++---------
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 13 ++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 ++-
.../io/gcp/bigquery/BigQueryTableSource.java | 4 +-
.../beam/sdk/io/gcp/bigquery/CreateTables.java | 95 ++++++----
.../io/gcp/bigquery/GenerateShardedTable.java | 3 +-
.../beam/sdk/io/gcp/bigquery/PrepareWrite.java | 80 +++++----
.../beam/sdk/io/gcp/bigquery/ShardedKey.java | 1 +
.../sdk/io/gcp/bigquery/StreamingInserts.java | 44 +----
.../io/gcp/bigquery/StreamingWriteTables.java | 86 +++++++++
.../sdk/io/gcp/bigquery/TableDestination.java | 1 +
.../io/gcp/bigquery/TableDestinationCoder.java | 62 +++----
.../sdk/io/gcp/bigquery/TableRowWriter.java | 14 +-
.../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 14 +-
.../io/gcp/bigquery/WriteBundlesToFiles.java | 25 +--
.../sdk/io/gcp/bigquery/WritePartition.java | 127 ++++++++-----
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 5 +-
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 17 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 66 ++++---
19 files changed, 516 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 06fdfce..236b234 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.gcp.bigquery;
-import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
@@ -35,7 +34,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -47,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -54,17 +53,13 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-
-/**
- * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
- */
+/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
BigQueryIO.Write<?> write;
- private static class ConstantSchemaFunction implements
- SerializableFunction<TableDestination, TableSchema> {
- private final @Nullable
- ValueProvider<String> jsonSchema;
+ private static class ConstantSchemaFunction
+ implements SerializableFunction<TableDestination, TableSchema> {
+ private final @Nullable ValueProvider<String> jsonSchema;
ConstantSchemaFunction(ValueProvider<String> jsonSchema) {
this.jsonSchema = jsonSchema;
@@ -86,7 +81,6 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
- ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
final String stepUuid = BigQueryHelpers.randomUUIDString();
@@ -94,40 +88,41 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
String tempFilePrefix;
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- tempFilePrefix = factory.resolve(
- factory.resolve(tempLocation, "BigQueryWriteTemp"),
- stepUuid);
+ tempFilePrefix =
+ factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid);
} catch (IOException e) {
throw new RuntimeException(
- String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
- e);
+ String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e);
}
// Create a singleton job ID token at execution time. This will be used as the base for all
// load jobs issued from this instance of the transfomr.
PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
- PCollectionView<String> jobIdTokenView = p
- .apply("TriggerIdCreation", Create.of("ignored"))
- .apply("CreateJobId", MapElements.via(
- new SimpleFunction<String, String>() {
- @Override
- public String apply(String input) {
- return stepUuid;
- }
- }))
- .apply(View.<String>asSingleton());
+ PCollectionView<String> jobIdTokenView =
+ p.apply("TriggerIdCreation", Create.of("ignored"))
+ .apply(
+ "CreateJobId",
+ MapElements.via(
+ new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return stepUuid;
+ }
+ }))
+ .apply(View.<String>asSingleton());
PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow =
- input.apply("rewindowIntoGlobal",
+ input.apply(
+ "rewindowIntoGlobal",
Window.<KV<TableDestination, TableRow>>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes());
// PCollection of filename, file byte size, and table destination.
- PCollection<WriteBundlesToFiles.Result> results = inputInGlobalWindow
- .apply("WriteBundlesToFiles",
- ParDo.of(new WriteBundlesToFiles(tempFilePrefix)))
- .setCoder(WriteBundlesToFiles.ResultCoder.of());
+ PCollection<WriteBundlesToFiles.Result> results =
+ inputInGlobalWindow
+ .apply("WriteBundlesToFiles", ParDo.of(new WriteBundlesToFiles(tempFilePrefix)))
+ .setCoder(WriteBundlesToFiles.ResultCoder.of());
TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag =
new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {};
@@ -136,20 +131,23 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
// Turn the list of files and record counts in a PCollectionView that can be used as a
// side input.
- PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = results
- .apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable());
+ PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
+ results.apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable());
// This transform will look at the set of files written for each table, and if any table has
// too many files or bytes, will partition that table's files into multiple partitions for
// loading.
- PCollectionTuple partitions = singleton.apply("WritePartition",
- ParDo.of(new WritePartition(
- write.getJsonTableRef(),
- write.getTableDescription(),
- resultsView,
- multiPartitionsTag,
- singlePartitionTag))
- .withSideInputs(resultsView)
- .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+ PCollectionTuple partitions =
+ singleton.apply(
+ "WritePartition",
+ ParDo.of(
+ new WritePartition(
+ write.getJsonTableRef(),
+ write.getTableDescription(),
+ resultsView,
+ multiPartitionsTag,
+ singlePartitionTag))
+ .withSideInputs(resultsView)
+ .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
// Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
// schema function here. If no schema is specified, this function will return null.
@@ -158,55 +156,69 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
new ConstantSchemaFunction(write.getJsonSchema());
Coder<KV<ShardedKey<TableDestination>, List<String>>> partitionsCoder =
- KvCoder.of(ShardedKeyCoder.of(TableDestinationCoder.of()),
- ListCoder.of(StringUtf8Coder.of()));
+ KvCoder.of(
+ ShardedKeyCoder.of(TableDestinationCoder.of()), ListCoder.of(StringUtf8Coder.of()));
// If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
// the import needs to be split into multiple partitions, and those partitions will be
// specified in multiPartitionsTag.
- PCollection<KV<TableDestination, String>> tempTables = partitions.get(multiPartitionsTag)
- .setCoder(partitionsCoder)
- // What's this GroupByKey for? Is this so we have a deterministic temp tables? If so, maybe
- // Reshuffle is better here.
- .apply("MultiPartitionsGroupByKey",
- GroupByKey.<ShardedKey<TableDestination>, List<String>>create())
- .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
- false,
- write.getBigQueryServices(),
- jobIdTokenView,
- tempFilePrefix,
- WriteDisposition.WRITE_EMPTY,
- CreateDisposition.CREATE_IF_NEEDED,
- schemaFunction))
- .withSideInputs(jobIdTokenView));
+ PCollection<KV<TableDestination, String>> tempTables =
+ partitions
+ .get(multiPartitionsTag)
+ .setCoder(partitionsCoder)
+ // Reshuffle will distribute this among multiple workers, and also guard against
+ // reexecution of the WritePartitions step once WriteTables has begun.
+ .apply(
+ "MultiPartitionsReshuffle",
+ Reshuffle.<ShardedKey<TableDestination>, List<String>>of())
+ .apply(
+ "MultiPartitionsWriteTables",
+ ParDo.of(
+ new WriteTables(
+ false,
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ tempFilePrefix,
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED,
+ schemaFunction))
+ .withSideInputs(jobIdTokenView));
// This view maps each final table destination to the set of temporary partitioned tables
// the PCollection was loaded into.
- PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = tempTables
- .apply("TempTablesView", View.<TableDestination, String>asMultimap());
-
- singleton.apply("WriteRename", ParDo
- .of(new WriteRename(
- write.getBigQueryServices(),
- jobIdTokenView,
- write.getWriteDisposition(),
- write.getCreateDisposition(),
- tempTablesView))
- .withSideInputs(tempTablesView, jobIdTokenView));
+ PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
+ tempTables.apply("TempTablesView", View.<TableDestination, String>asMultimap());
+
+ singleton.apply(
+ "WriteRename",
+ ParDo.of(
+ new WriteRename(
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ write.getWriteDisposition(),
+ write.getCreateDisposition(),
+ tempTablesView))
+ .withSideInputs(tempTablesView, jobIdTokenView));
// Write single partition to final table
- partitions.get(singlePartitionTag)
+ partitions
+ .get(singlePartitionTag)
.setCoder(partitionsCoder)
- .apply("SinglePartitionGroupByKey",
- GroupByKey.<ShardedKey<TableDestination>, List<String>>create())
- .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
- true,
- write.getBigQueryServices(),
- jobIdTokenView,
- tempFilePrefix,
- write.getWriteDisposition(),
- write.getCreateDisposition(),
- schemaFunction))
- .withSideInputs(jobIdTokenView));
+ // Reshuffle will distribute this among multiple workers, and also guard against
+ // reexecution of the WritePartitions step once WriteTables has begun.
+ .apply(
+ "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<TableDestination>, List<String>>of())
+ .apply(
+ "SinglePartitionWriteTables",
+ ParDo.of(
+ new WriteTables(
+ true,
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ tempFilePrefix,
+ write.getWriteDisposition(),
+ write.getCreateDisposition(),
+ schemaFunction))
+ .withSideInputs(jobIdTokenView));
return WriteResult.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 846103d..e04361c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.Hashing;
import java.io.IOException;
import java.util.ArrayList;
@@ -234,6 +235,18 @@ public class BigQueryHelpers {
}
}
+ // Create a unique job id for a table load.
+ static String createJobId(String prefix, TableDestination tableDestination, int partition) {
+ // Job ID must be different for each partition of each table.
+ String destinationHash =
+ Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString();
+ if (partition >= 0) {
+ return String.format("%s_%s_%05d", prefix, destinationHash, partition);
+ } else {
+ return String.format("%s_%s", prefix, destinationHash);
+ }
+ }
+
@VisibleForTesting
static class JsonSchemaToTableSchema
implements SerializableFunction<String, TableSchema> {
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 54a25c7..3f5947e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -61,7 +61,6 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
@@ -445,7 +444,8 @@ public class BigQueryIO {
// Note that a table or query check can fail if the table or dataset are created by
// earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
// For these cases the withoutValidation method can be used to disable the check.
- if (getValidate() && table != null) {
+ if (getValidate() && table != null && table.isAccessible() && table.get().getProjectId()
+ != null) {
checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
// Check for source table presence for early failure notification.
DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
@@ -650,6 +650,7 @@ public class BigQueryIO {
public static <T> Write<T> write() {
return new AutoValue_BigQueryIO_Write.Builder<T>()
.setValidate(true)
+ .setTableDescription("")
.setBigQueryServices(new BigQueryServicesImpl())
.setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
@@ -690,7 +691,8 @@ public class BigQueryIO {
@Nullable abstract ValueProvider<String> getJsonSchema();
abstract CreateDisposition getCreateDisposition();
abstract WriteDisposition getWriteDisposition();
- @Nullable abstract String getTableDescription();
+ /** Table description. Default is empty. */
+ abstract String getTableDescription();
/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();
abstract BigQueryServices getBigQueryServices();
@@ -805,9 +807,6 @@ public class BigQueryIO {
public Write<T> to(ValueProvider<String> tableSpec) {
ensureToNotCalledYet();
String tableDescription = getTableDescription();
- if (tableDescription == null) {
- tableDescription = "";
- }
return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
@@ -911,7 +910,7 @@ public class BigQueryIO {
public void validate(PCollection<T> input) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
- // Exactly one of the table and table reference can be configured.
+ // We must have a destination to write to!
checkState(getTableFunction() != null,
"must set the table reference of a BigQueryIO.Write transform");
@@ -972,8 +971,8 @@ public class BigQueryIO {
@Override
public WriteResult expand(PCollection<T> input) {
PCollection<KV<TableDestination, TableRow>> rowsWithDestination =
- input.apply("PrepareWrite", ParDo.of(
- new PrepareWrite<T>(getTableFunction(), getFormatFunction())))
+ input.apply("PrepareWrite", new PrepareWrite<T>(
+ getTableFunction(), getFormatFunction()))
.setCoder(KvCoder.of(TableDestinationCoder.of(), TableRowJsonCoder.of()));
@@ -1013,8 +1012,8 @@ public class BigQueryIO {
.withLabel("Table WriteDisposition"))
.addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"), true)
- .addIfNotNull(DisplayData.item("tableDescription", getTableDescription())
- .withLabel("Table Description"));
+ .addIfNotDefault(DisplayData.item("tableDescription", getTableDescription())
+ .withLabel("Table Description"), "");
}
/** Returns the table schema. */
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 22aba64..a28da92 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -109,8 +109,8 @@ class BigQueryTableSource extends BigQuerySourceBase {
@Override
public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
if (tableSizeBytes.get() == null) {
- TableReference table = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(),
- TableReference.class);
+ TableReference table = setDefaultProjectIfAbsent(options.as(BigQueryOptions.class),
+ BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class));
Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
.getTable(table).getNumBytes();
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index e216553..a78f32d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -1,68 +1,94 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
-
+import org.apache.beam.sdk.values.PCollection;
/**
- * Creates any tables needed before performing streaming writes to the tables. This is a
- * side-effect {l@ink DoFn}, and returns the original collection unchanged.
+ * Creates any tables needed before performing streaming writes to the tables. This is a side-effect
+ * {@link DoFn}, and returns the original collection unchanged.
*/
-public class CreateTables extends DoFn<KV<TableDestination, TableRow>,
- KV<TableDestination, TableRow>> {
+public class CreateTables
+ extends PTransform<
+ PCollection<KV<TableDestination, TableRow>>, PCollection<KV<TableDestination, TableRow>>> {
private final CreateDisposition createDisposition;
private final BigQueryServices bqServices;
private final SerializableFunction<TableDestination, TableSchema> schemaFunction;
-
- /** The list of tables created so far, so we don't try the creation
- each time.
- * TODO: We should put a bound on memory usage of this. Use guava cache instead.
+ /**
+ * The list of tables created so far, so we don't try the creation each time.
+ *
+ * <p>TODO: We should put a bound on memory usage of this. Use guava cache instead.
*/
private static Set<String> createdTables =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- public CreateTables(CreateDisposition createDisposition, BigQueryServices bqServices,
- SerializableFunction<TableDestination, TableSchema> schemaFunction) {
+ public CreateTables(
+ CreateDisposition createDisposition,
+ SerializableFunction<TableDestination, TableSchema> schemaFunction) {
+ this(createDisposition, new BigQueryServicesImpl(), schemaFunction);
+ }
+
+ private CreateTables(
+ CreateDisposition createDisposition,
+ BigQueryServices bqServices,
+ SerializableFunction<TableDestination, TableSchema> schemaFunction) {
this.createDisposition = createDisposition;
this.bqServices = bqServices;
this.schemaFunction = schemaFunction;
}
- @ProcessElement
- public void processElement(ProcessContext context) throws InterruptedException, IOException {
- BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
- possibleCreateTable(options, context.element().getKey());
- context.output(context.element());
+ CreateTables withTestServices(BigQueryServices bqServices) {
+ return new CreateTables(createDisposition, bqServices, schemaFunction);
+ }
+
+ @Override
+ public PCollection<KV<TableDestination, TableRow>> expand(
+ PCollection<KV<TableDestination, TableRow>> input) {
+ return input.apply(
+ ParDo.of(
+ new DoFn<KV<TableDestination, TableRow>, KV<TableDestination, TableRow>>() {
+ @ProcessElement
+ public void processElement(ProcessContext context)
+ throws InterruptedException, IOException {
+ BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
+ possibleCreateTable(options, context.element().getKey());
+ context.output(context.element());
+ }
+ }));
}
private void possibleCreateTable(BigQueryOptions options, TableDestination tableDestination)
@@ -70,8 +96,7 @@ public class CreateTables extends DoFn<KV<TableDestination, TableRow>,
String tableSpec = tableDestination.getTableSpec();
TableReference tableReference = tableDestination.getTableReference();
String tableDescription = tableDestination.getTableDescription();
- if (createDisposition != createDisposition.CREATE_NEVER
- && !createdTables.contains(tableSpec)) {
+ if (createDisposition != createDisposition.CREATE_NEVER && !createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
@@ -92,6 +117,8 @@ public class CreateTables extends DoFn<KV<TableDestination, TableRow>,
}
}
+ /** This method is used by the testing fake to clear static state. */
+ @VisibleForTesting
static void clearCreatedTables() {
synchronized (createdTables) {
createdTables.clear();
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
index da3a70a..90d41a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
@@ -39,8 +39,7 @@ class GenerateShardedTable extends DoFn<KV<TableDestination, TableRow>,
@ProcessElement
public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
- // We output on keys 0-50 to ensure that there's enough batching for
- // BigQuery.
+ // We output on keys 0-numShards.
String tableSpec = context.element().getKey().getTableSpec();
context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, numShards)),
context.element().getValue()));
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
index 7712417..a8bdb43 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
@@ -1,20 +1,20 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableReference;
@@ -23,6 +23,8 @@ import com.google.common.base.Strings;
import java.io.IOException;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
@@ -30,37 +32,49 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
/**
- * Prepare an input {@link PCollection} for writing to BigQuery. Use the table-reference
- * function to determine which tables each element is written to, and format the element into a
- * {@link TableRow} using the user-supplied format function.
+ * Prepare an input {@link PCollection} for writing to BigQuery. Use the table function to determine
+ * which tables each element is written to, and format the element into a {@link TableRow} using the
+ * user-supplied format function.
*/
-public class PrepareWrite<T> extends DoFn<T, KV<TableDestination, TableRow>> {
+public class PrepareWrite<T>
+ extends PTransform<PCollection<T>, PCollection<KV<TableDestination, TableRow>>> {
private SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction;
private SerializableFunction<T, TableRow> formatFunction;
- public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction,
- SerializableFunction<T, TableRow> formatFunction) {
+ public PrepareWrite(
+ SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction,
+ SerializableFunction<T, TableRow> formatFunction) {
this.tableFunction = tableFunction;
this.formatFunction = formatFunction;
}
- @ProcessElement
- public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
- TableDestination tableDestination = tableSpecFromWindowedValue(
- context.getPipelineOptions().as(BigQueryOptions.class),
- ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
- TableRow tableRow = formatFunction.apply(context.element());
- context.output(KV.of(tableDestination, tableRow));
+ @Override
+ public PCollection<KV<TableDestination, TableRow>> expand(PCollection<T> input) {
+ return input.apply(
+ ParDo.of(
+ new DoFn<T, KV<TableDestination, TableRow>>() {
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window)
+ throws IOException {
+ TableDestination tableDestination =
+ tableSpecFromWindowedValue(
+ context.getPipelineOptions().as(BigQueryOptions.class),
+ ValueInSingleWindow.of(
+ context.element(), context.timestamp(), window, context.pane()));
+ TableRow tableRow = formatFunction.apply(context.element());
+ context.output(KV.of(tableDestination, tableRow));
+ }
+ }));
}
- private TableDestination tableSpecFromWindowedValue(BigQueryOptions options,
- ValueInSingleWindow<T> value) {
+ private TableDestination tableSpecFromWindowedValue(
+ BigQueryOptions options, ValueInSingleWindow<T> value) {
TableDestination tableDestination = tableFunction.apply(value);
TableReference tableReference = tableDestination.getTableReference();
if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
tableReference.setProjectId(options.getProject());
- tableDestination = new TableDestination(tableReference,
- tableDestination.getTableDescription());
+ tableDestination =
+ new TableDestination(tableReference, tableDestination.getTableDescription());
}
return tableDestination;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
index 09b4fbf..c2b739f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -25,6 +25,7 @@ import java.util.Objects;
* A key and a shard number.
*/
class ShardedKey<K> implements Serializable {
+ private static final long serialVersionUID = 1L;
private final K key;
private final int shardNumber;
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index ced1d66..efd9c31 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -22,15 +22,10 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -38,8 +33,8 @@ import org.apache.beam.sdk.values.PCollection;
* PTransform that performs streaming BigQuery write. To increase consistency,
* it leverages BigQuery best effort de-dup mechanism.
*/
-class StreamingInserts extends PTransform<PCollection<KV<TableDestination, TableRow>>,
- WriteResult> {
+public class StreamingInserts extends
+ PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
private final Write<?> write;
private static class ConstantSchemaFunction implements
@@ -74,36 +69,11 @@ class StreamingInserts extends PTransform<PCollection<KV<TableDestination, Table
SerializableFunction<TableDestination, TableSchema> schemaFunction =
new ConstantSchemaFunction(write.getSchema());
- // A naive implementation would be to simply stream data directly to BigQuery.
- // However, this could occasionally lead to duplicated data, e.g., when
- // a VM that runs this code is restarted and the code is re-run.
+ PCollection<KV<TableDestination, TableRow>> writes = input
+ .apply("CreateTables", new CreateTables(write.getCreateDisposition(), schemaFunction)
+ .withTestServices(write.getBigQueryServices()));
- // The above risk is mitigated in this implementation by relying on
- // BigQuery built-in best effort de-dup mechanism.
-
- // To use this mechanism, each input TableRow is tagged with a generated
- // unique id, which is then passed to BigQuery and used to ignore duplicates.
- PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input
- .apply("CreateTables", ParDo.of(new CreateTables(write.getCreateDisposition(),
- write.getBigQueryServices(), schemaFunction)))
- // We create 50 keys per BigQuery table to generate output on. This is few enough that we
- // get good batching into BigQuery's insert calls, and enough that we can max out the
- // streaming insert quota.
- .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable(50)))
- .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of()))
- .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds()));
-
- // To prevent having the same TableRow processed more than once with regenerated
- // different unique ids, this implementation relies on "checkpointing", which is
- // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
- // performed by Reshuffle.
- tagged
- .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
- .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
- .apply("StreamingWrite",
- ParDo.of(
- new StreamingWriteFn(write.getBigQueryServices())));
-
- return WriteResult.in(input.getPipeline());
+ return writes.apply(new StreamingWriteTables()
+ .withTestServices(write.getBigQueryServices()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
new file mode 100644
index 0000000..4ddc1df
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * This transform takes in key-value pairs of {@link TableRow} entries and the
+ * {@link TableDestination} it should be written to. The BigQuery streaming-write service is used
+ * to stream these writes to the appropriate table.
+ *
+ * <p>This transform assumes that all destination tables already exist by the time it sees a write
+ * for that table.
+ */
+public class StreamingWriteTables extends PTransform<
+ PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+ private BigQueryServices bigQueryServices;
+
+ public StreamingWriteTables() {
+ this(new BigQueryServicesImpl());
+ }
+
+ private StreamingWriteTables(BigQueryServices bigQueryServices) {
+ this.bigQueryServices = bigQueryServices;
+ }
+
+ StreamingWriteTables withTestServices(BigQueryServices bigQueryServices) {
+ return new StreamingWriteTables(bigQueryServices);
+ }
+
+ @Override
+ public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
+ // A naive implementation would be to simply stream data directly to BigQuery.
+ // However, this could occasionally lead to duplicated data, e.g., when
+ // a VM that runs this code is restarted and the code is re-run.
+
+ // The above risk is mitigated in this implementation by relying on
+ // BigQuery built-in best effort de-dup mechanism.
+
+ // To use this mechanism, each input TableRow is tagged with a generated
+ // unique id, which is then passed to BigQuery and used to ignore duplicates
+ // We create 50 keys per BigQuery table to generate output on. This is few enough that we
+ // get good batching into BigQuery's insert calls, and enough that we can max out the
+ // streaming insert quota.
+ PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
+ input.apply("ShardTableWrites", ParDo.of
+ (new GenerateShardedTable(50)))
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of()))
+ .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds()));
+
+ // To prevent having the same TableRow processed more than once with regenerated
+ // different unique ids, this implementation relies on "checkpointing", which is
+ // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
+ // performed by Reshuffle.
+ tagged
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
+ .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
+ .apply("StreamingWrite",
+ ParDo.of(
+ new StreamingWriteFn(bigQueryServices)));
+ return WriteResult.in(input.getPipeline());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 36e1401..962e2cd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -27,6 +27,7 @@ import java.util.Objects;
* Encapsulates a BigQuery table destination.
*/
public class TableDestination implements Serializable {
+ private static final long serialVersionUID = 1L;
private final String tableSpec;
private final String tableDescription;
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index fa24700..262a00d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -1,20 +1,20 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.beam.sdk.io.gcp.bigquery;
@@ -26,20 +26,18 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-/**
- * A coder for {@link TableDestination} objects.
- */
+/** A coder for {@link TableDestination} objects. */
public class TableDestinationCoder extends AtomicCoder<TableDestination> {
private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
-
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
@JsonCreator
public static TableDestinationCoder of() {
- return INSTANCE;
- }
+ return INSTANCE;
+ }
@Override
- public void encode(TableDestination value, OutputStream outStream, Context context)
+ public void encode(TableDestination value, OutputStream outStream, Context context)
throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null value");
@@ -50,15 +48,13 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
@Override
public TableDestination decode(InputStream inStream, Context context) throws IOException {
- return new TableDestination(
- stringCoder.decode(inStream, context.nested()),
- stringCoder.decode(inStream, context.nested()));
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- return;
- }
+ return new TableDestination(
+ stringCoder.decode(inStream, context.nested()),
+ stringCoder.decode(inStream, context.nested()));
+ }
- StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ return;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index ee8f466..91ef404 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -32,9 +32,7 @@ import org.apache.beam.sdk.util.MimeTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery.
- */
+/** Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */
class TableRowWriter {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
@@ -47,16 +45,18 @@ class TableRowWriter {
protected String mimeType = MimeTypes.TEXT;
private CountingOutputStream out;
- public class Result {
- String filename;
- long byteSize;
+ public static final class Result {
+ final String filename;
+ final long byteSize;
+
public Result(String filename, long byteSize) {
this.filename = filename;
this.byteSize = byteSize;
}
}
+
TableRowWriter(String basename) {
- this.tempFilePrefix = basename;
+ this.tempFilePrefix = basename;
}
public final void open(String uId) throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index 7379784..284691e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -28,15 +28,14 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
/**
- * Fn that tags each table row with a unique id and destination table.
- * To avoid calling UUID.randomUUID() for each element, which can be costly,
- * a randomUUID is generated only once per bucket of data. The actual unique
- * id is created by concatenating this randomUUID with a sequential number.
+ * Fn that tags each table row with a unique id and destination table. To avoid calling
+ * UUID.randomUUID() for each element, which can be costly, a randomUUID is generated only once per
+ * bucket of data. The actual unique id is created by concatenating this randomUUID with a
+ * sequential number.
*/
@VisibleForTesting
class TagWithUniqueIds
extends DoFn<KV<ShardedKey<String>, TableRow>, KV<ShardedKey<String>, TableRowInfo>> {
-
private transient String randomUUID;
private transient long sequenceNo = 0L;
@@ -51,8 +50,9 @@ class TagWithUniqueIds
String uniqueId = randomUUID + sequenceNo++;
// We output on keys 0-50 to ensure that there's enough batching for
// BigQuery.
- context.output(KV.of(context.element().getKey(),
- new TableRowInfo(context.element().getValue(), uniqueId)));
+ context.output(
+ KV.of(
+ context.element().getKey(), new TableRowInfo(context.element().getValue(), uniqueId)));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 869e68a..a25cc90 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -51,10 +51,11 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
* The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
* and encapsulates the table it is destined to as well as the file byte size.
*/
- public static class Result implements Serializable {
- public String filename;
- public Long fileByteSize;
- public TableDestination tableDestination;
+ public static final class Result implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public final String filename;
+ public final Long fileByteSize;
+ public final TableDestination tableDestination;
public Result(String filename, Long fileByteSize, TableDestination tableDestination) {
this.filename = filename;
@@ -68,6 +69,9 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
*/
public static class ResultCoder extends AtomicCoder<Result> {
private static final ResultCoder INSTANCE = new ResultCoder();
+ private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final VarLongCoder longCoder = VarLongCoder.of();
+ private static final TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of();
public static ResultCoder of() {
return INSTANCE;
@@ -87,18 +91,15 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
@Override
public Result decode(InputStream inStream, Context context)
throws IOException {
- return new Result(stringCoder.decode(inStream, context.nested()),
- longCoder.decode(inStream, context.nested()),
- tableDestinationCoder.decode(inStream, context.nested()));
+ String filename = stringCoder.decode(inStream, context.nested());
+ long fileByteSize = longCoder.decode(inStream, context.nested());
+ TableDestination tableDestination = tableDestinationCoder.decode(inStream, context.nested());
+ return new Result(filename, fileByteSize, tableDestination);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
}
-
- StringUtf8Coder stringCoder = StringUtf8Coder.of();
- VarLongCoder longCoder = VarLongCoder.of();
- TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of();
}
WriteBundlesToFiles(String tempFilePrefix) {
@@ -107,6 +108,8 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
@StartBundle
public void startBundle(Context c) {
+ // This must be done each bundle, as by default the {@link DoFn} might be reused between
+ // bundles.
this.writers = Maps.newHashMap();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 9c48b82..9414909 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -44,7 +44,65 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<
private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag;
private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag;
- public WritePartition(
+ private static class PartitionData {
+ private int numFiles = 0;
+ private long byteSize = 0;
+ private List<String> filenames = Lists.newArrayList();
+
+ int getNumFiles() {
+ return numFiles;
+ }
+
+ void addFiles(int numFiles) {
+ this.numFiles += numFiles;
+ }
+
+ long getByteSize() {
+ return byteSize;
+ }
+
+ void addBytes(long numBytes) {
+ this.byteSize += numBytes;
+ }
+
+ List<String> getFilenames() {
+ return filenames;
+ }
+
+ void addFilename(String filename) {
+ filenames.add(filename);
+ }
+
+ // Check to see whether we can add to this partition without exceeding the maximum partition
+ // size.
+ boolean canAccept(int numFiles, long numBytes) {
+ return this.numFiles + numFiles <= Write.MAX_NUM_FILES
+ && this.byteSize + numBytes <= Write.MAX_SIZE_BYTES;
+ }
+ }
+
+ private static class DestinationData {
+ private List<PartitionData> partitions = Lists.newArrayList();
+
+ DestinationData() {
+ // Always start out with a single empty partition.
+ partitions.add(new PartitionData());
+ }
+
+ List<PartitionData> getPartitions() {
+ return partitions;
+ }
+
+ PartitionData getLatestPartition() {
+ return partitions.get(partitions.size() - 1);
+ }
+
+ void addPartition(PartitionData partition) {
+ partitions.add(partition);
+ }
+ }
+
+ WritePartition(
ValueProvider<String> singletonOutputJsonTableRef,
String singletonOutputTableDescription,
PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView,
@@ -76,54 +134,41 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<
}
- long partitionId = 0;
- Map<TableDestination, Integer> currNumFilesMap = Maps.newHashMap();
- Map<TableDestination, Long> currSizeBytesMap = Maps.newHashMap();
- Map<TableDestination, List<List<String>>> currResultsMap = Maps.newHashMap();
- for (int i = 0; i < results.size(); ++i) {
- WriteBundlesToFiles.Result fileResult = results.get(i);
+ Map<TableDestination, DestinationData> currentResults = Maps.newHashMap();
+ for (WriteBundlesToFiles.Result fileResult : results) {
TableDestination tableDestination = fileResult.tableDestination;
- List<List<String>> partitions = currResultsMap.get(tableDestination);
- if (partitions == null) {
- partitions = Lists.newArrayList();
- partitions.add(Lists.<String>newArrayList());
- currResultsMap.put(tableDestination, partitions);
+ DestinationData destinationData = currentResults.get(tableDestination);
+ if (destinationData == null) {
+ destinationData = new DestinationData();
+ currentResults.put(tableDestination, destinationData);
}
- int currNumFiles = getOrDefault(currNumFilesMap, tableDestination, 0);
- long currSizeBytes = getOrDefault(currSizeBytesMap, tableDestination, 0L);
- if (currNumFiles + 1 > Write.MAX_NUM_FILES
- || currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) {
- // Add a new partition for this table.
- partitions.add(Lists.<String>newArrayList());
- // c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
- currNumFiles = 0;
- currSizeBytes = 0;
- currNumFilesMap.remove(tableDestination);
- currSizeBytesMap.remove(tableDestination);
+
+ PartitionData latestPartition = destinationData.getLatestPartition();
+ if (!latestPartition.canAccept(1, fileResult.fileByteSize)) {
+ // Too much data, roll over to a new partition.
+ latestPartition = new PartitionData();
+ destinationData.addPartition(latestPartition);
}
- currNumFilesMap.put(tableDestination, currNumFiles + 1);
- currSizeBytesMap.put(tableDestination, currSizeBytes + fileResult.fileByteSize);
- // Always add to the most recent partition for this table.
- partitions.get(partitions.size() - 1).add(fileResult.filename);
+ latestPartition.addFilename(fileResult.filename);
+ latestPartition.addFiles(1);
+ latestPartition.addBytes(fileResult.fileByteSize);
}
- for (Map.Entry<TableDestination, List<List<String>>> entry : currResultsMap.entrySet()) {
+ // Now that we've figured out which tables and partitions to write out, emit this information
+ // to the next stage.
+ for (Map.Entry<TableDestination, DestinationData> entry : currentResults.entrySet()) {
TableDestination tableDestination = entry.getKey();
- List<List<String>> partitions = entry.getValue();
+ DestinationData destinationData = entry.getValue();
+ // In the fast-path case where we only output one table, the transform loads it directly
+ // to the final table. In this case, we output on a special TupleTag so the enclosing
+ // transform knows to skip the rename step.
TupleTag<KV<ShardedKey<TableDestination>, List<String>>> outputTag =
- (partitions.size() == 1) ? singlePartitionTag : multiPartitionsTag;
- for (int i = 0; i < partitions.size(); ++i) {
- c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1), partitions.get(i)));
+ (destinationData.getPartitions().size() == 1) ? singlePartitionTag : multiPartitionsTag;
+ for (int i = 0; i < destinationData.getPartitions().size(); ++i) {
+ PartitionData partitionData = destinationData.getPartitions().get(i);
+ c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1),
+ partitionData.getFilenames()));
}
}
}
-
- private <T> T getOrDefault(Map<TableDestination, T> map, TableDestination tableDestination,
- T defaultValue) {
- if (map.containsKey(tableDestination)) {
- return map.get(tableDestination);
- } else {
- return defaultValue;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 752e7d3..9b1c989 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -89,8 +89,9 @@ class WriteRename extends DoFn<String, Void> {
}
// Make sure each destination table gets a unique job id.
- String jobIdPrefix = String.format(
- c.sideInput(jobIdToken) + "0x%08x", finalTableDestination.hashCode());
+ String jobIdPrefix = BigQueryHelpers.createJobId(
+ c.sideInput(jobIdToken), finalTableDestination, -1);
+
copy(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index f7fe87b..4a6cd2b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -57,11 +57,15 @@ import org.slf4j.LoggerFactory;
/**
* Writes partitions to BigQuery tables.
*
- * <p>The input is a list of files corresponding to a partition of a table. These files are
+ * <p>The input is a list of files corresponding to each partition of a table. These files are
* load into a temporary table (or into the final table if there is only one partition). The output
- * is a {@link KV} mapping the final table to the temporary tables for each partition of that table.
+ * is a {@link KV} mapping each final table to a list of the temporary tables containing its data.
+ *
+ * <p>In the case where all the data in the files fit into a single load job, this transform loads
+ * the data directly into the final table, skipping temporary tables. In this case, the output
+ * {@link KV} maps the final table to itself.
*/
-class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, Iterable<List<String>>>,
+class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, List<String>>,
KV<TableDestination, String>> {
private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
@@ -94,10 +98,9 @@ class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, Iterable<List<St
public void processElement(ProcessContext c) throws Exception {
TableDestination tableDestination = c.element().getKey().getKey();
Integer partition = c.element().getKey().getShardNumber();
- List<String> partitionFiles = Lists.newArrayList(c.element().getValue()).get(0);
- // Job ID must be different for each partition of each table.
- String jobIdPrefix = String.format(
- c.sideInput(jobIdToken) + "_0x%08x_%05d", tableDestination.hashCode(), partition);
+ List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
+ String jobIdPrefix = BigQueryHelpers.createJobId(
+ c.sideInput(jobIdToken), tableDestination, partition);
TableReference ref = tableDestination.getTableReference();
if (!singlePartition) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index f10be13..d0004e4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
@@ -122,7 +123,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -607,13 +607,11 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- @Category(NeedsRunner.class)
public void testStreamingWriteWithDynamicTables() throws Exception {
testWriteWithDynamicTables(true);
}
@Test
- @Category(NeedsRunner.class)
public void testBatchWriteWithDynamicTables() throws Exception {
testWriteWithDynamicTables(false);
}
@@ -842,7 +840,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.writeTableRows().to("foo.com:project:somedataset.sometable");
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
}
@Test
@@ -894,7 +892,7 @@ public class BigQueryIOTest implements Serializable {
null,
CreateDisposition.CREATE_IF_NEEDED,
WriteDisposition.WRITE_EMPTY,
- null,
+ "",
false);
}
@@ -905,7 +903,7 @@ public class BigQueryIOTest implements Serializable {
checkWriteObject(
write, null, "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY,
- null);
+ "");
}
@Test
@@ -917,7 +915,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows().to(table);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
}
@Test
@@ -927,7 +925,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.<TableRow>write().to("foo.com:project:somedataset.sometable").withSchema(schema);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+ schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
}
@Test
@@ -937,7 +935,7 @@ public class BigQueryIOTest implements Serializable {
.withCreateDisposition(CreateDisposition.CREATE_NEVER);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null);
+ null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, "");
}
@Test
@@ -947,7 +945,7 @@ public class BigQueryIOTest implements Serializable {
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
}
@Test
@@ -957,7 +955,7 @@ public class BigQueryIOTest implements Serializable {
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null);
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, "");
}
@Test
@@ -967,7 +965,7 @@ public class BigQueryIOTest implements Serializable {
.withWriteDisposition(WriteDisposition.WRITE_APPEND);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null);
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, "");
}
@Test
@@ -977,7 +975,7 @@ public class BigQueryIOTest implements Serializable {
.withWriteDisposition(WriteDisposition.WRITE_EMPTY);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
}
@Test
@@ -1359,7 +1357,6 @@ public class BigQueryIOTest implements Serializable {
SourceTestUtils.assertSplitAtFractionBehavior(
bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(2, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
@@ -1626,9 +1623,11 @@ public class BigQueryIOTest implements Serializable {
TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
+ PCollection<WriteBundlesToFiles.Result> filesPCollection =
+ p.apply(Create.of(files).withType(new TypeDescriptor<WriteBundlesToFiles.Result>() {}));
PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
PCollectionViews.iterableView(
- p,
+ filesPCollection,
WindowingStrategy.globalDefault(),
WriteBundlesToFiles.ResultCoder.of());
@@ -1699,14 +1698,12 @@ public class BigQueryIOTest implements Serializable {
Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables");
- List<KV<ShardedKey<TableDestination>, Iterable<List<String>>>> partitions =
- Lists.newArrayList();
+ List<KV<ShardedKey<TableDestination>, List<String>>> partitions = Lists.newArrayList();
for (int i = 0; i < numTables; ++i) {
String tableName = String.format("project-id:dataset-id.table%05d", i);
TableDestination tableDestination = new TableDestination(tableName, tableName);
for (int j = 0; j < numPartitions; ++j) {
- String tempTableId = String.format(
- jobIdToken + "_0x%08x_%05d", tableDestination.hashCode(), j);
+ String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j);
List<String> filesPerPartition = Lists.newArrayList();
for (int k = 0; k < numFilesPerPartition; ++k) {
String filename = Paths.get(baseDir.toString(),
@@ -1721,7 +1718,7 @@ public class BigQueryIOTest implements Serializable {
filesPerPartition.add(filename);
}
partitions.add(KV.of(ShardedKey.of(tableDestination, j),
- (Iterable<List<String>>) Collections.singleton(filesPerPartition)));
+ filesPerPartition));
List<String> expectedTables = expectedTempTables.get(tableDestination);
if (expectedTables == null) {
@@ -1735,11 +1732,6 @@ public class BigQueryIOTest implements Serializable {
}
}
- PCollection<String> expectedTempTablesPCollection = p.apply(Create.of(expectedTempTables));
- PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView(
- expectedTempTablesPCollection,
- WindowingStrategy.globalDefault(),
- StringUtf8Coder.of());
PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId"));
PCollectionView<String> jobIdTokenView =
jobIdTokenCollection.apply(View.<String>asSingleton());
@@ -1753,10 +1745,10 @@ public class BigQueryIOTest implements Serializable {
CreateDisposition.CREATE_IF_NEEDED,
null);
- DoFnTester<KV<ShardedKey<TableDestination>, Iterable<List<String>>>,
+ DoFnTester<KV<ShardedKey<TableDestination>, List<String>>,
KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
- for (KV<ShardedKey<TableDestination>, Iterable<List<String>>> partition : partitions) {
+ for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) {
tester.processElement(partition);
}
@@ -1848,11 +1840,27 @@ public class BigQueryIOTest implements Serializable {
}
}
+ PCollection<KV<TableDestination, String>> tempTablesPCollection =
+ p.apply(Create.of(tempTables)
+ .withCoder(KvCoder.of(TableDestinationCoder.of(),
+ IterableCoder.of(StringUtf8Coder.of()))))
+ .apply(ParDo.of(new DoFn<KV<TableDestination, Iterable<String>>,
+ KV<TableDestination, String>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ TableDestination tableDestination = c.element().getKey();
+ for (String tempTable : c.element().getValue()) {
+ c.output(KV.of(tableDestination, tempTable));
+ }
+ }
+ }));
+
PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
PCollectionViews.multimapView(
- p,
+ tempTablesPCollection,
WindowingStrategy.globalDefault(),
- KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()));
+ KvCoder.of(TableDestinationCoder.of(),
+ StringUtf8Coder.of()));
PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId"));
PCollectionView<String> jobIdTokenView =