You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/28 15:51:45 UTC
[3/6] beam git commit: Refactor BigQueryIO.Write helper transforms
into separate files.
Refactor BigQueryIO.Write helper transforms into separate files.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d6ef0104
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d6ef0104
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d6ef0104
Branch: refs/heads/master
Commit: d6ef0104d5b8e1076cea2a9d08ad14ba0e8e84f1
Parents: 2cc2e81
Author: Reuven Lax <re...@google.com>
Authored: Fri Mar 17 15:30:53 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Mar 28 08:46:15 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 182 +++
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 64 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1063 +-----------------
.../io/gcp/bigquery/BigQueryQuerySource.java | 21 +-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 20 +-
.../io/gcp/bigquery/BigQueryTableSource.java | 18 +
.../io/gcp/bigquery/PassThroughThenCleanup.java | 18 +
.../beam/sdk/io/gcp/bigquery/ShardedKey.java | 44 +
.../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 87 ++
.../sdk/io/gcp/bigquery/StreamWithDeDup.java | 98 ++
.../sdk/io/gcp/bigquery/StreamingWriteFn.java | 186 +++
.../beam/sdk/io/gcp/bigquery/TableRowInfo.java | 34 +
.../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 68 ++
.../sdk/io/gcp/bigquery/TableRowWriter.java | 84 ++
.../gcp/bigquery/TagWithUniqueIdsAndTable.java | 135 +++
.../sdk/io/gcp/bigquery/TransformingSource.java | 18 +
.../beam/sdk/io/gcp/bigquery/WriteBundles.java | 82 ++
.../sdk/io/gcp/bigquery/WritePartition.java | 79 ++
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 180 +++
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 213 ++++
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 39 +-
21 files changed, 1653 insertions(+), 1080 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
new file mode 100644
index 0000000..75b1cc7
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
+ */
+class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> {
+ BigQueryIO.Write<T> write;
+
+ BatchLoadBigQuery(BigQueryIO.Write<T> write) {
+ this.write = write;
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ Pipeline p = input.getPipeline();
+ BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
+ ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
+
+ final String stepUuid = BigQueryHelpers.randomUUIDString();
+
+ String tempLocation = options.getTempLocation();
+ String tempFilePrefix;
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ tempFilePrefix = factory.resolve(
+ factory.resolve(tempLocation, "BigQueryWriteTemp"),
+ stepUuid);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+ e);
+ }
+
+ // Create a singleton job ID token at execution time.
+ PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+ PCollectionView<String> jobIdTokenView = p
+ .apply("TriggerIdCreation", Create.of("ignored"))
+ .apply("CreateJobId", MapElements.via(
+ new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return stepUuid;
+ }
+ }))
+ .apply(View.<String>asSingleton());
+
+ PCollection<T> typedInputInGlobalWindow =
+ input.apply(
+ Window.<T>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+ // Avoid applying the formatFunction if it is the identity formatter.
+ PCollection<TableRow> inputInGlobalWindow;
+ if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) {
+ inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
+ } else {
+ inputInGlobalWindow = typedInputInGlobalWindow
+ .apply(MapElements.via(write.getFormatFunction())
+ .withOutputType(new TypeDescriptor<TableRow>() {
+ }));
+ }
+
+ // PCollection of filename, file byte size.
+ PCollection<KV<String, Long>> results = inputInGlobalWindow
+ .apply("WriteBundles",
+ ParDo.of(new WriteBundles(tempFilePrefix)));
+
+ TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+ TupleTag<KV<Long, List<String>>> singlePartitionTag =
+ new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+ // Turn the list of files and record counts in a PCollectionView that can be used as a
+ // side input.
+ PCollectionView<Iterable<KV<String, Long>>> resultsView = results
+ .apply("ResultsView", View.<KV<String, Long>>asIterable());
+ PCollectionTuple partitions = singleton.apply(ParDo
+ .of(new WritePartition(
+ resultsView,
+ multiPartitionsTag,
+ singlePartitionTag))
+ .withSideInputs(resultsView)
+ .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+ // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
+ // the import needs to be split into multiple partitions, and those partitions will be
+ // specified in multiPartitionsTag.
+ PCollection<String> tempTables = partitions.get(multiPartitionsTag)
+ .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
+ .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+ false,
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ tempFilePrefix,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ write.getJsonSchema(),
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED,
+ write.getTableDescription()))
+ .withSideInputs(jobIdTokenView));
+
+ PCollectionView<Iterable<String>> tempTablesView = tempTables
+ .apply("TempTablesView", View.<String>asIterable());
+ singleton.apply(ParDo
+ .of(new WriteRename(
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ write.getWriteDisposition(),
+ write.getCreateDisposition(),
+ tempTablesView,
+ write.getTableDescription()))
+ .withSideInputs(tempTablesView, jobIdTokenView));
+
+ // Write single partition to final table
+ partitions.get(singlePartitionTag)
+ .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
+ .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+ true,
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ tempFilePrefix,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ write.getJsonSchema(),
+ write.getWriteDisposition(),
+ write.getCreateDisposition(),
+ write.getTableDescription()))
+ .withSideInputs(jobIdTokenView));
+
+ return PDone.in(input.getPipeline());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 9fba938..37ff124 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -1,5 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.beam.sdk.io.gcp.bigquery;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.TableReference;
@@ -14,16 +34,34 @@ import java.util.UUID;
import java.util.regex.Matcher;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
-
/**
* A set of helper functions and classes used by {@link BigQueryIO}.
*/
-public class BigQueryHelpers {
+class BigQueryHelpers {
+ private static final String RESOURCE_NOT_FOUND_ERROR =
+ "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline"
+ + " execution. If the %1$s is created by an earlier stage of the pipeline, this"
+ + " validation can be disabled using #withoutValidation.";
+
+ private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR =
+ "Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the %1$s is created by"
+ + " an earlier stage of the pipeline, this validation can be disabled using"
+ + " #withoutValidation.";
+
+ /**
+ * Status of a BigQuery job or request.
+ */
+ enum Status {
+ SUCCEEDED,
+ FAILED,
+ UNKNOWN,
+ }
+
@Nullable
/**
* Return a displayable string representation for a {@link TableReference}.
@@ -138,6 +176,26 @@ public class BigQueryHelpers {
return UUID.randomUUID().toString().replaceAll("-", "");
}
+ static void verifyTableNotExistOrEmpty(
+ DatasetService datasetService,
+ TableReference tableRef) {
+ try {
+ if (datasetService.getTable(tableRef) != null) {
+ checkState(
+ datasetService.isTableEmpty(tableRef),
+ "BigQuery table is not empty: %s.",
+ toTableSpec(tableRef));
+ }
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException(
+ "unable to confirm BigQuery table emptiness for table "
+ + toTableSpec(tableRef), e);
+ }
+ }
+
@VisibleForTesting
static class JsonSchemaToTableSchema
implements SerializableFunction<String, TableSchema> {
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3f2f3e8..4917083 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -18,19 +18,13 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
@@ -39,54 +33,30 @@ import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -94,41 +64,19 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.FileIOChannelFactory;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -296,7 +244,7 @@ public class BigQueryIO {
* A formatting function that maps a TableRow to itself. This allows sending a
* {@code PCollection<TableRow>} directly to BigQueryIO.Write.
*/
- private static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER =
+ static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER =
new SerializableFunction<TableRow, TableRow>() {
@Override
public TableRow apply(TableRow input) {
@@ -622,7 +570,7 @@ public class BigQueryIO {
*
* <p>If the table's project is not specified, use the executing project.
*/
- @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+ @Nullable ValueProvider<TableReference> getTableWithDefaultProject(
BigQueryOptions bqOptions) {
ValueProvider<TableReference> table = getTableProvider();
if (table == null) {
@@ -753,11 +701,11 @@ public class BigQueryIO {
static final long MAX_SIZE_BYTES = 11 * (1L << 40);
// The maximum number of retry jobs.
- private static final int MAX_RETRY_JOBS = 3;
+ static final int MAX_RETRY_JOBS = 3;
// The maximum number of retries to poll the status of a job.
// It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
- private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+ static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
@Nullable abstract ValueProvider<String> getJsonTableRef();
@Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableReference>
@@ -974,26 +922,6 @@ public class BigQueryIO {
return toBuilder().setBigQueryServices(testServices).build();
}
- private static void verifyTableNotExistOrEmpty(
- DatasetService datasetService,
- TableReference tableRef) {
- try {
- if (datasetService.getTable(tableRef) != null) {
- checkState(
- datasetService.isTableEmpty(tableRef),
- "BigQuery table is not empty: %s.",
- BigQueryHelpers.toTableSpec(tableRef));
- }
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new RuntimeException(
- "unable to confirm BigQuery table emptiness for table "
- + BigQueryHelpers.toTableSpec(tableRef), e);
- }
- }
-
@Override
public void validate(PCollection<T> input) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
@@ -1030,7 +958,7 @@ public class BigQueryIO {
verifyTablePresence(datasetService, table);
}
if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
- verifyTableNotExistOrEmpty(datasetService, table);
+ BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
}
}
@@ -1074,176 +1002,12 @@ public class BigQueryIO {
@Override
public PDone expand(PCollection<T> input) {
- Pipeline p = input.getPipeline();
- BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-
// When writing an Unbounded PCollection, or when a tablespec function is defined, we use
// StreamWithDeDup and BigQuery's streaming import API.
if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
return input.apply(new StreamWithDeDup<T>(this));
- }
-
- ValueProvider<TableReference> table = getTableWithDefaultProject(options);
-
- final String stepUuid = BigQueryHelpers.randomUUIDString();
-
- String tempLocation = options.getTempLocation();
- String tempFilePrefix;
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- tempFilePrefix = factory.resolve(
- factory.resolve(tempLocation, "BigQueryWriteTemp"),
- stepUuid);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
- e);
- }
-
- // Create a singleton job ID token at execution time.
- PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
- PCollectionView<String> jobIdTokenView = p
- .apply("TriggerIdCreation", Create.of("ignored"))
- .apply("CreateJobId", MapElements.via(
- new SimpleFunction<String, String>() {
- @Override
- public String apply(String input) {
- return stepUuid;
- }
- }))
- .apply(View.<String>asSingleton());
-
- PCollection<T> typedInputInGlobalWindow =
- input.apply(
- Window.<T>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
- // Avoid applying the formatFunction if it is the identity formatter.
- PCollection<TableRow> inputInGlobalWindow;
- if (getFormatFunction() == IDENTITY_FORMATTER) {
- inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
} else {
- inputInGlobalWindow = typedInputInGlobalWindow
- .apply(MapElements.via(getFormatFunction())
- .withOutputType(new TypeDescriptor<TableRow>() {
- }));
- }
-
- // PCollection of filename, file byte size.
- PCollection<KV<String, Long>> results = inputInGlobalWindow
- .apply("WriteBundles",
- ParDo.of(new WriteBundles(tempFilePrefix)));
-
- TupleTag<KV<Long, List<String>>> multiPartitionsTag =
- new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
- TupleTag<KV<Long, List<String>>> singlePartitionTag =
- new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
- // Turn the list of files and record counts in a PCollectionView that can be used as a
- // side input.
- PCollectionView<Iterable<KV<String, Long>>> resultsView = results
- .apply("ResultsView", View.<KV<String, Long>>asIterable());
- PCollectionTuple partitions = singleton.apply(ParDo
- .of(new WritePartition(
- resultsView,
- multiPartitionsTag,
- singlePartitionTag))
- .withSideInputs(resultsView)
- .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
- // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
- // the import needs to be split into multiple partitions, and those partitions will be
- // specified in multiPartitionsTag.
- PCollection<String> tempTables = partitions.get(multiPartitionsTag)
- .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
- .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
- false,
- getBigQueryServices(),
- jobIdTokenView,
- tempFilePrefix,
- NestedValueProvider.of(table, new TableRefToJson()),
- getJsonSchema(),
- WriteDisposition.WRITE_EMPTY,
- CreateDisposition.CREATE_IF_NEEDED,
- getTableDescription()))
- .withSideInputs(jobIdTokenView));
-
- PCollectionView<Iterable<String>> tempTablesView = tempTables
- .apply("TempTablesView", View.<String>asIterable());
- singleton.apply(ParDo
- .of(new WriteRename(
- getBigQueryServices(),
- jobIdTokenView,
- NestedValueProvider.of(table, new TableRefToJson()),
- getWriteDisposition(),
- getCreateDisposition(),
- tempTablesView,
- getTableDescription()))
- .withSideInputs(tempTablesView, jobIdTokenView));
-
- // Write single partition to final table
- partitions.get(singlePartitionTag)
- .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
- .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
- true,
- getBigQueryServices(),
- jobIdTokenView,
- tempFilePrefix,
- NestedValueProvider.of(table, new TableRefToJson()),
- getJsonSchema(),
- getWriteDisposition(),
- getCreateDisposition(),
- getTableDescription()))
- .withSideInputs(jobIdTokenView));
-
- return PDone.in(input.getPipeline());
- }
-
- private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
- private transient TableRowWriter writer = null;
- private final String tempFilePrefix;
-
- WriteBundles(String tempFilePrefix) {
- this.tempFilePrefix = tempFilePrefix;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- if (writer == null) {
- writer = new TableRowWriter(tempFilePrefix);
- writer.open(UUID.randomUUID().toString());
- LOG.debug("Done opening writer {}", writer);
- }
- try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
- }
- }
-
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- if (writer != null) {
- c.output(writer.close());
- writer = null;
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder
- .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
- .withLabel("Temporary File Prefix"));
+ return input.apply(new BatchLoadBigQuery<T>(this));
}
}
@@ -1289,7 +1053,7 @@ public class BigQueryIO {
*
* <p>If the table's project is not specified, use the executing project.
*/
- @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+ @Nullable ValueProvider<TableReference> getTableWithDefaultProject(
BigQueryOptions bqOptions) {
ValueProvider<TableReference> table = getTable();
if (table == null) {
@@ -1319,391 +1083,6 @@ public class BigQueryIO {
}
- static class TableRowWriter {
- private static final Coder<TableRow> CODER = TableRowJsonCoder.of();
- private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
- private final String tempFilePrefix;
- private String id;
- private String fileName;
- private WritableByteChannel channel;
- protected String mimeType = MimeTypes.TEXT;
- private CountingOutputStream out;
-
- TableRowWriter(String basename) {
- this.tempFilePrefix = basename;
- }
-
- public final void open(String uId) throws Exception {
- id = uId;
- fileName = tempFilePrefix + id;
- LOG.debug("Opening {}.", fileName);
- channel = IOChannelUtils.create(fileName, mimeType);
- try {
- out = new CountingOutputStream(Channels.newOutputStream(channel));
- LOG.debug("Writing header to {}.", fileName);
- } catch (Exception e) {
- try {
- LOG.error("Writing header to {} failed, closing channel.", fileName);
- channel.close();
- } catch (IOException closeException) {
- LOG.error("Closing channel for {} failed", fileName);
- }
- throw e;
- }
- LOG.debug("Starting write of bundle {} to {}.", this.id, fileName);
- }
-
- public void write(TableRow value) throws Exception {
- CODER.encode(value, out, Context.OUTER);
- out.write(NEWLINE);
- }
-
- public final KV<String, Long> close() throws IOException {
- channel.close();
- return KV.of(fileName, out.getCount());
- }
- }
-
- /**
- * Partitions temporary files based on number of files and file sizes.
- */
- static class WritePartition extends DoFn<String, KV<Long, List<String>>> {
- private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
- private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
- private TupleTag<KV<Long, List<String>>> singlePartitionTag;
-
- public WritePartition(
- PCollectionView<Iterable<KV<String, Long>>> resultsView,
- TupleTag<KV<Long, List<String>>> multiPartitionsTag,
- TupleTag<KV<Long, List<String>>> singlePartitionTag) {
- this.resultsView = resultsView;
- this.multiPartitionsTag = multiPartitionsTag;
- this.singlePartitionTag = singlePartitionTag;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
- if (results.isEmpty()) {
- TableRowWriter writer = new TableRowWriter(c.element());
- writer.open(UUID.randomUUID().toString());
- results.add(writer.close());
- }
-
- long partitionId = 0;
- int currNumFiles = 0;
- long currSizeBytes = 0;
- List<String> currResults = Lists.newArrayList();
- for (int i = 0; i < results.size(); ++i) {
- KV<String, Long> fileResult = results.get(i);
- if (currNumFiles + 1 > Write.MAX_NUM_FILES
- || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
- c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
- currResults = Lists.newArrayList();
- currNumFiles = 0;
- currSizeBytes = 0;
- }
- ++currNumFiles;
- currSizeBytes += fileResult.getValue();
- currResults.add(fileResult.getKey());
- }
- if (partitionId == 0) {
- c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults));
- } else {
- c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
- }
- }
- }
-
- /**
- * Writes partitions to BigQuery tables.
- */
- static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
- private final boolean singlePartition;
- private final BigQueryServices bqServices;
- private final PCollectionView<String> jobIdToken;
- private final String tempFilePrefix;
- private final ValueProvider<String> jsonTableRef;
- private final ValueProvider<String> jsonSchema;
- private final WriteDisposition writeDisposition;
- private final CreateDisposition createDisposition;
- @Nullable private final String tableDescription;
-
- public WriteTables(
- boolean singlePartition,
- BigQueryServices bqServices,
- PCollectionView<String> jobIdToken,
- String tempFilePrefix,
- ValueProvider<String> jsonTableRef,
- ValueProvider<String> jsonSchema,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- @Nullable String tableDescription) {
- this.singlePartition = singlePartition;
- this.bqServices = bqServices;
- this.jobIdToken = jobIdToken;
- this.tempFilePrefix = tempFilePrefix;
- this.jsonTableRef = jsonTableRef;
- this.jsonSchema = jsonSchema;
- this.writeDisposition = writeDisposition;
- this.createDisposition = createDisposition;
- this.tableDescription = tableDescription;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
- String jobIdPrefix = String.format(
- c.sideInput(jobIdToken) + "_%05d", c.element().getKey());
- TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(),
- TableReference.class);
- if (!singlePartition) {
- ref.setTableId(jobIdPrefix);
- }
-
- load(
- bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
- bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
- jobIdPrefix,
- ref,
- BigQueryHelpers.fromJsonString(
- jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
- partition,
- writeDisposition,
- createDisposition,
- tableDescription);
- c.output(BigQueryHelpers.toJsonString(ref));
-
- removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition);
- }
-
- private void load(
- JobService jobService,
- DatasetService datasetService,
- String jobIdPrefix,
- TableReference ref,
- @Nullable TableSchema schema,
- List<String> gcsUris,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- @Nullable String tableDescription) throws InterruptedException, IOException {
- JobConfigurationLoad loadConfig = new JobConfigurationLoad()
- .setDestinationTable(ref)
- .setSchema(schema)
- .setSourceUris(gcsUris)
- .setWriteDisposition(writeDisposition.name())
- .setCreateDisposition(createDisposition.name())
- .setSourceFormat("NEWLINE_DELIMITED_JSON");
-
- String projectId = ref.getProjectId();
- Job lastFailedLoadJob = null;
- for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
- String jobId = jobIdPrefix + "-" + i;
- JobReference jobRef = new JobReference()
- .setProjectId(projectId)
- .setJobId(jobId);
- jobService.startLoadJob(jobRef, loadConfig);
- Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
- Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
- switch (jobStatus) {
- case SUCCEEDED:
- if (tableDescription != null) {
- datasetService.patchTableDescription(ref, tableDescription);
- }
- return;
- case UNKNOWN:
- throw new RuntimeException(String.format(
- "UNKNOWN status of load job [%s]: %s.", jobId,
- BigQueryHelpers.jobToPrettyString(loadJob)));
- case FAILED:
- lastFailedLoadJob = loadJob;
- continue;
- default:
- throw new IllegalStateException(String.format(
- "Unexpected status [%s] of load job: %s.",
- jobStatus, BigQueryHelpers.jobToPrettyString(loadJob)));
- }
- }
- throw new RuntimeException(String.format(
- "Failed to create load job with id prefix %s, "
- + "reached max retries: %d, last failed load job: %s.",
- jobIdPrefix,
- Write.MAX_RETRY_JOBS,
- BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
- }
-
- static void removeTemporaryFiles(
- PipelineOptions options,
- String tempFilePrefix,
- Collection<String> files)
- throws IOException {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix);
- if (factory instanceof GcsIOChannelFactory) {
- GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options);
- gcsUtil.remove(files);
- } else if (factory instanceof FileIOChannelFactory) {
- for (String filename : files) {
- LOG.debug("Removing file {}", filename);
- boolean exists = Files.deleteIfExists(Paths.get(filename));
- if (!exists) {
- LOG.debug("{} does not exist.", filename);
- }
- }
- } else {
- throw new IOException("Unrecognized file system.");
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder
- .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
- .withLabel("Temporary File Prefix"))
- .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
- .withLabel("Table Reference"))
- .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema)
- .withLabel("Table Schema"))
- .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
- .withLabel("Table Description"));
- }
- }
-
- /**
- * Copies temporary tables to destination table.
- */
- static class WriteRename extends DoFn<String, Void> {
- private final BigQueryServices bqServices;
- private final PCollectionView<String> jobIdToken;
- private final ValueProvider<String> jsonTableRef;
- private final WriteDisposition writeDisposition;
- private final CreateDisposition createDisposition;
- private final PCollectionView<Iterable<String>> tempTablesView;
- @Nullable private final String tableDescription;
-
- public WriteRename(
- BigQueryServices bqServices,
- PCollectionView<String> jobIdToken,
- ValueProvider<String> jsonTableRef,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- PCollectionView<Iterable<String>> tempTablesView,
- @Nullable String tableDescription) {
- this.bqServices = bqServices;
- this.jobIdToken = jobIdToken;
- this.jsonTableRef = jsonTableRef;
- this.writeDisposition = writeDisposition;
- this.createDisposition = createDisposition;
- this.tempTablesView = tempTablesView;
- this.tableDescription = tableDescription;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
-
- // Do not copy if no temp tables are provided
- if (tempTablesJson.size() == 0) {
- return;
- }
-
- List<TableReference> tempTables = Lists.newArrayList();
- for (String table : tempTablesJson) {
- tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
- }
- copy(
- bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
- bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
- c.sideInput(jobIdToken),
- BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class),
- tempTables,
- writeDisposition,
- createDisposition,
- tableDescription);
-
- DatasetService tableService =
- bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
- removeTemporaryTables(tableService, tempTables);
- }
-
- private void copy(
- JobService jobService,
- DatasetService datasetService,
- String jobIdPrefix,
- TableReference ref,
- List<TableReference> tempTables,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- @Nullable String tableDescription) throws InterruptedException, IOException {
- JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy()
- .setSourceTables(tempTables)
- .setDestinationTable(ref)
- .setWriteDisposition(writeDisposition.name())
- .setCreateDisposition(createDisposition.name());
-
- String projectId = ref.getProjectId();
- Job lastFailedCopyJob = null;
- for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
- String jobId = jobIdPrefix + "-" + i;
- JobReference jobRef = new JobReference()
- .setProjectId(projectId)
- .setJobId(jobId);
- jobService.startCopyJob(jobRef, copyConfig);
- Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
- Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
- switch (jobStatus) {
- case SUCCEEDED:
- if (tableDescription != null) {
- datasetService.patchTableDescription(ref, tableDescription);
- }
- return;
- case UNKNOWN:
- throw new RuntimeException(String.format(
- "UNKNOWN status of copy job [%s]: %s.", jobId,
- BigQueryHelpers.jobToPrettyString(copyJob)));
- case FAILED:
- lastFailedCopyJob = copyJob;
- continue;
- default:
- throw new IllegalStateException(String.format(
- "Unexpected status [%s] of load job: %s.",
- jobStatus, BigQueryHelpers.jobToPrettyString(copyJob)));
- }
- }
- throw new RuntimeException(String.format(
- "Failed to create copy job with id prefix %s, "
- + "reached max retries: %d, last failed copy job: %s.",
- jobIdPrefix,
- Write.MAX_RETRY_JOBS,
- BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
- }
-
- static void removeTemporaryTables(DatasetService tableService,
- List<TableReference> tempTables) {
- for (TableReference tableRef : tempTables) {
- try {
- LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef));
- tableService.deleteTable(tableRef);
- } catch (Exception e) {
- LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e);
- }
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder
- .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
- .withLabel("Table Reference"))
- .add(DisplayData.item("writeDisposition", writeDisposition.toString())
- .withLabel("Write Disposition"))
- .add(DisplayData.item("createDisposition", createDisposition.toString())
- .withLabel("Create Disposition"));
- }
- }
}
private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
@@ -1753,434 +1132,6 @@ public class BigQueryIO {
static void clearCreatedTables() {
StreamingWriteFn.clearCreatedTables();
}
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Implementation of DoFn to perform streaming BigQuery write.
- */
- @VisibleForTesting
- static class StreamingWriteFn
- extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
- /** TableSchema in JSON. Use String to make the class Serializable. */
- @Nullable private final ValueProvider<String> jsonTableSchema;
-
- @Nullable private final String tableDescription;
-
- private final BigQueryServices bqServices;
-
- /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
- private transient Map<String, List<TableRow>> tableRows;
-
- private final Write.CreateDisposition createDisposition;
-
- /** The list of unique ids for each BigQuery table row. */
- private transient Map<String, List<String>> uniqueIdsForTableRows;
-
- /** The list of tables created so far, so we don't try the creation
- each time. */
- private static Set<String> createdTables =
- Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-
- /** Tracks bytes written, exposed as "ByteCount" Metrics Counter. */
- private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount");
-
- /** Constructor. */
- StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
- Write.CreateDisposition createDisposition,
- @Nullable String tableDescription, BigQueryServices bqServices) {
- this.jsonTableSchema = schema == null ? null :
- NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
- this.createDisposition = createDisposition;
- this.bqServices = checkNotNull(bqServices, "bqServices");
- this.tableDescription = tableDescription;
- }
-
- /**
- * Clear the cached map of created tables. Used for testing.
- */
- private static void clearCreatedTables() {
- synchronized (createdTables) {
- createdTables.clear();
- }
- }
-
- /** Prepares a target BigQuery table. */
- @StartBundle
- public void startBundle(Context context) {
- tableRows = new HashMap<>();
- uniqueIdsForTableRows = new HashMap<>();
- }
-
- /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
- @ProcessElement
- public void processElement(ProcessContext context) {
- String tableSpec = context.element().getKey().getKey();
- List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec);
- List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows,
- tableSpec);
-
- rows.add(context.element().getValue().tableRow);
- uniqueIds.add(context.element().getValue().uniqueId);
- }
-
- /** Writes the accumulated rows into BigQuery with streaming API. */
- @FinishBundle
- public void finishBundle(Context context) throws Exception {
- BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
-
- for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
- TableReference tableReference = getOrCreateTable(options, entry.getKey());
- flushRows(tableReference, entry.getValue(),
- uniqueIdsForTableRows.get(entry.getKey()), options);
- }
- tableRows.clear();
- uniqueIdsForTableRows.clear();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder
- .addIfNotNull(DisplayData.item("schema", jsonTableSchema)
- .withLabel("Table Schema"))
- .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
- .withLabel("Table Description"));
- }
-
- public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
- throws InterruptedException, IOException {
- TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
- if (createDisposition != createDisposition.CREATE_NEVER
- && !createdTables.contains(tableSpec)) {
- synchronized (createdTables) {
- // Another thread may have succeeded in creating the table in the meanwhile, so
- // check again. This check isn't needed for correctness, but we add it to prevent
- // every thread from attempting a create and overwhelming our BigQuery quota.
- DatasetService datasetService = bqServices.getDatasetService(options);
- if (!createdTables.contains(tableSpec)) {
- if (datasetService.getTable(tableReference) == null) {
- TableSchema tableSchema = JSON_FACTORY.fromString(
- jsonTableSchema.get(), TableSchema.class);
- datasetService.createTable(
- new Table()
- .setTableReference(tableReference)
- .setSchema(tableSchema)
- .setDescription(tableDescription));
- }
- createdTables.add(tableSpec);
- }
- }
- }
- return tableReference;
- }
-
- /**
- * Writes the accumulated rows into BigQuery with streaming API.
- */
- private void flushRows(TableReference tableReference,
- List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
- throws InterruptedException {
- if (!tableRows.isEmpty()) {
- try {
- long totalBytes = bqServices.getDatasetService(options).insertAll(
- tableReference, tableRows, uniqueIds);
- byteCounter.inc(totalBytes);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- private static class ShardedKey<K> {
- private final K key;
- private final int shardNumber;
-
- public static <K> ShardedKey<K> of(K key, int shardNumber) {
- return new ShardedKey<>(key, shardNumber);
- }
-
- private ShardedKey(K key, int shardNumber) {
- this.key = key;
- this.shardNumber = shardNumber;
- }
-
- public K getKey() {
- return key;
- }
-
- public int getShardNumber() {
- return shardNumber;
- }
- }
-
- /**
- * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
- */
- @VisibleForTesting
- static class ShardedKeyCoder<KeyT>
- extends StandardCoder<ShardedKey<KeyT>> {
- public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
- return new ShardedKeyCoder<>(keyCoder);
- }
-
- @JsonCreator
- public static <KeyT> ShardedKeyCoder<KeyT> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<KeyT>> components) {
- checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
- return of(components.get(0));
- }
-
- protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
- this.keyCoder = keyCoder;
- this.shardNumberCoder = VarIntCoder.of();
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(keyCoder);
- }
-
- @Override
- public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context)
- throws IOException {
- keyCoder.encode(key.getKey(), outStream, context.nested());
- shardNumberCoder.encode(key.getShardNumber(), outStream, context);
- }
-
- @Override
- public ShardedKey<KeyT> decode(InputStream inStream, Context context)
- throws IOException {
- return new ShardedKey<>(
- keyCoder.decode(inStream, context.nested()),
- shardNumberCoder.decode(inStream, context));
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- keyCoder.verifyDeterministic();
- }
-
- Coder<KeyT> keyCoder;
- VarIntCoder shardNumberCoder;
- }
-
- @VisibleForTesting
- static class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
- private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
-
- @JsonCreator
- public static TableRowInfoCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(TableRowInfo value, OutputStream outStream, Context context)
- throws IOException {
- if (value == null) {
- throw new CoderException("cannot encode a null value");
- }
- tableRowCoder.encode(value.tableRow, outStream, context.nested());
- idCoder.encode(value.uniqueId, outStream, context);
- }
-
- @Override
- public TableRowInfo decode(InputStream inStream, Context context)
- throws IOException {
- return new TableRowInfo(
- tableRowCoder.decode(inStream, context.nested()),
- idCoder.decode(inStream, context));
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this, "TableRows are not deterministic.");
- }
-
- TableRowJsonCoder tableRowCoder = TableRowJsonCoder.of();
- StringUtf8Coder idCoder = StringUtf8Coder.of();
- }
-
- private static class TableRowInfo {
- TableRowInfo(TableRow tableRow, String uniqueId) {
- this.tableRow = tableRow;
- this.uniqueId = uniqueId;
- }
-
- final TableRow tableRow;
- final String uniqueId;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Fn that tags each table row with a unique id and destination table.
- * To avoid calling UUID.randomUUID() for each element, which can be costly,
- * a randomUUID is generated only once per bucket of data. The actual unique
- * id is created by concatenating this randomUUID with a sequential number.
- */
- @VisibleForTesting
- static class TagWithUniqueIdsAndTable<T>
- extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> {
- /** TableSpec to write to. */
- private final ValueProvider<String> tableSpec;
-
- /** User function mapping windowed values to {@link TableReference} in JSON. */
- private final SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction;
-
- /** User function mapping user type to a TableRow. */
- private final SerializableFunction<T, TableRow> formatFunction;
-
- private transient String randomUUID;
- private transient long sequenceNo = 0L;
-
- TagWithUniqueIdsAndTable(BigQueryOptions options,
- ValueProvider<TableReference> table,
- SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction,
- SerializableFunction<T, TableRow> formatFunction) {
- checkArgument(table == null ^ tableRefFunction == null,
- "Exactly one of table or tableRefFunction should be set");
- if (table != null) {
- if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
- TableReference tableRef = table.get()
- .setProjectId(options.as(BigQueryOptions.class).getProject());
- table = NestedValueProvider.of(
- StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
- new JsonTableRefToTableRef());
- }
- this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
- } else {
- tableSpec = null;
- }
- this.tableRefFunction = tableRefFunction;
- this.formatFunction = formatFunction;
- }
-
-
- @StartBundle
- public void startBundle(Context context) {
- randomUUID = UUID.randomUUID().toString();
- }
-
- /** Tag the input with a unique id. */
- @ProcessElement
- public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
- String uniqueId = randomUUID + sequenceNo++;
- ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
- String tableSpec = tableSpecFromWindowedValue(
- context.getPipelineOptions().as(BigQueryOptions.class),
- ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
- // We output on keys 0-50 to ensure that there's enough batching for
- // BigQuery.
- context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
- new TableRowInfo(formatFunction.apply(context.element()), uniqueId)));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder.addIfNotNull(DisplayData.item("table", tableSpec));
- if (tableRefFunction != null) {
- builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
- .withLabel("Table Reference Function"));
- }
- }
-
- @VisibleForTesting
- ValueProvider<String> getTableSpec() {
- return tableSpec;
- }
-
- private String tableSpecFromWindowedValue(BigQueryOptions options,
- ValueInSingleWindow<T> value) {
- if (tableSpec != null) {
- return tableSpec.get();
- } else {
- TableReference table = tableRefFunction.apply(value);
- if (table.getProjectId() == null) {
- table.setProjectId(options.getProject());
- }
- return BigQueryHelpers.toTableSpec(table);
- }
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * PTransform that performs streaming BigQuery write. To increase consistency,
- * it leverages BigQuery best effort de-dup mechanism.
- */
- private static class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
- private final Write<T> write;
-
- /** Constructor. */
- StreamWithDeDup(Write<T> write) {
- this.write = write;
- }
-
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
-
- @Override
- public PDone expand(PCollection<T> input) {
- // A naive implementation would be to simply stream data directly to BigQuery.
- // However, this could occasionally lead to duplicated data, e.g., when
- // a VM that runs this code is restarted and the code is re-run.
-
- // The above risk is mitigated in this implementation by relying on
- // BigQuery built-in best effort de-dup mechanism.
-
- // To use this mechanism, each input TableRow is tagged with a generated
- // unique id, which is then passed to BigQuery and used to ignore duplicates.
-
- PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
- input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>(
- input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(),
- write.getTableRefFunction(), write.getFormatFunction())));
-
- // To prevent having the same TableRow processed more than once with regenerated
- // different unique ids, this implementation relies on "checkpointing", which is
- // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
- // performed by Reshuffle.
- NestedValueProvider<TableSchema, String> schema =
- write.getJsonSchema() == null
- ? null
- : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
- tagged
- .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
- .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
- .apply(
- ParDo.of(
- new StreamingWriteFn(
- schema,
- write.getCreateDisposition(),
- write.getTableDescription(),
- write.getBigQueryServices())));
-
- // Note that the implementation to return PDone here breaks the
- // implicit assumption about the job execution order. If a user
- // implements a PTransform that takes PDone returned here as its
- // input, the transform may not necessarily be executed after
- // the BigQueryIO.Write.
-
- return PDone.in(input.getPipeline());
- }
- }
-
- /**
- * Status of a BigQuery job or request.
- */
- enum Status {
- SUCCEEDED,
- FAILED,
- UNKNOWN,
- }
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index a909957..9153157 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -14,9 +32,10 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.BigQueryOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index ff50e6d..746258f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -18,7 +36,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index aae0faa..cbd5781 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkNotNull;
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
index 612afbe..75f7b93 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
new file mode 100644
index 0000000..8c968df
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+/**
+ * A key and a shard number.
+ */
+class ShardedKey<K> {
+ private final K key;
+ private final int shardNumber;
+
+ public static <K> ShardedKey<K> of(K key, int shardNumber) {
+ return new ShardedKey<>(key, shardNumber);
+ }
+
+ ShardedKey(K key, int shardNumber) {
+ this.key = key;
+ this.shardNumber = shardNumber;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public int getShardNumber() {
+ return shardNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
new file mode 100644
index 0000000..be4e71c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.util.PropertyNames;
+
+
+/**
+ * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
+ */
+@VisibleForTesting
+class ShardedKeyCoder<KeyT>
+ extends StandardCoder<ShardedKey<KeyT>> {
+ public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
+ return new ShardedKeyCoder<>(keyCoder);
+ }
+
+ @JsonCreator
+ public static <KeyT> ShardedKeyCoder<KeyT> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<KeyT>> components) {
+ checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
+ return of(components.get(0));
+ }
+
+ protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
+ this.keyCoder = keyCoder;
+ this.shardNumberCoder = VarIntCoder.of();
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(keyCoder);
+ }
+
+ @Override
+ public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context)
+ throws IOException {
+ keyCoder.encode(key.getKey(), outStream, context.nested());
+ shardNumberCoder.encode(key.getShardNumber(), outStream, context);
+ }
+
+ @Override
+ public ShardedKey<KeyT> decode(InputStream inStream, Context context)
+ throws IOException {
+ return new ShardedKey<>(
+ keyCoder.decode(inStream, context.nested()),
+ shardNumberCoder.decode(inStream, context));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ keyCoder.verifyDeterministic();
+ }
+
+ Coder<KeyT> keyCoder;
+ VarIntCoder shardNumberCoder;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
new file mode 100644
index 0000000..f667295
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+* PTransform that performs streaming BigQuery write. To increase consistency,
+* it leverages BigQuery best effort de-dup mechanism.
+ */
+class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
+ private final Write<T> write;
+
+ /** Constructor. */
+ StreamWithDeDup(Write<T> write) {
+ this.write = write;
+ }
+
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ // A naive implementation would be to simply stream data directly to BigQuery.
+ // However, this could occasionally lead to duplicated data, e.g., when
+ // a VM that runs this code is restarted and the code is re-run.
+
+ // The above risk is mitigated in this implementation by relying on
+ // BigQuery built-in best effort de-dup mechanism.
+
+ // To use this mechanism, each input TableRow is tagged with a generated
+ // unique id, which is then passed to BigQuery and used to ignore duplicates.
+
+ PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
+ input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>(
+ input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(),
+ write.getTableRefFunction(), write.getFormatFunction())));
+
+ // To prevent having the same TableRow processed more than once with regenerated
+ // different unique ids, this implementation relies on "checkpointing", which is
+ // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
+ // performed by Reshuffle.
+ NestedValueProvider<TableSchema, String> schema =
+ write.getJsonSchema() == null
+ ? null
+ : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
+ tagged
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
+ .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
+ .apply(
+ ParDo.of(
+ new StreamingWriteFn(
+ schema,
+ write.getCreateDisposition(),
+ write.getTableDescription(),
+ write.getBigQueryServices())));
+
+ // Note that the implementation to return PDone here breaks the
+ // implicit assumption about the job execution order. If a user
+ // implements a PTransform that takes PDone returned here as its
+ // input, the transform may not necessarily be executed after
+ // the BigQueryIO.Write.
+
+ return PDone.in(input.getPipeline());
+ }
+}