You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 06:43:24 UTC
[1/2] incubator-beam git commit: [BEAM-383] Modified BigQueryIO to
write based on number of files and file sizes
Repository: incubator-beam
Updated Branches:
refs/heads/master 595d2d4ba -> 34d501278
[BEAM-383] Modified BigQueryIO to write based on number of files and file sizes
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8db6114e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8db6114e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8db6114e
Branch: refs/heads/master
Commit: 8db6114e2087cafc4369f6ec85b04f978dfb1984
Parents: 595d2d4
Author: Ian Zhou <ia...@google.com>
Authored: Wed Jul 20 15:56:21 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 23:40:27 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 585 ++++++++++++++-----
.../sdk/io/gcp/bigquery/BigQueryServices.java | 7 +
.../io/gcp/bigquery/BigQueryServicesImpl.java | 51 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 213 ++++++-
4 files changed, 693 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 8741c9c..2ba7562 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
@@ -33,9 +34,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.BigQueryOptions;
@@ -44,6 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -52,7 +51,13 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FileIOChannelFactory;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
@@ -80,6 +85,7 @@ import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
@@ -93,6 +99,7 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.io.CountingOutputStream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -110,6 +117,8 @@ import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -196,7 +205,8 @@ import javax.annotation.Nullable;
* <p>See {@link BigQueryIO.Write} for details on how to specify if a write should
* append to an existing table, replace the table, or verify that the table is
* empty. Note that the dataset being written to must already exist. Unbounded PCollections can only
- * be written using {@link WriteDisposition#WRITE_EMPTY} or {@link WriteDisposition#WRITE_APPEND}.
+ * be written using {@link Write.WriteDisposition#WRITE_EMPTY} or
+ * {@link Write.WriteDisposition#WRITE_APPEND}.
*
* <h3>Sharding BigQuery output tables</h3>
* <p>A common use case is to dynamically generate BigQuery table names based on
@@ -1412,6 +1422,19 @@ public class BigQueryIO {
* {@link PCollection} of {@link TableRow TableRows} to a BigQuery table.
*/
public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
+ // Maximum number of files in a single partition.
+ static final int MAX_NUM_FILES = 10000;
+
+ // Maximum number of bytes in a single partition.
+ static final long MAX_SIZE_BYTES = 3 * (1L << 40);
+
+ // The maximum number of retry jobs.
+ static final int MAX_RETRY_JOBS = 3;
+
+ // The maximum number of retries to poll the status of a job.
+ // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+ static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
@Nullable final String jsonTableRef;
@Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
@@ -1666,7 +1689,8 @@ public class BigQueryIO {
@Override
public PDone apply(PCollection<TableRow> input) {
- BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+ Pipeline p = input.getPipeline();
+ BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
BigQueryServices bqServices = getBigQueryServices();
// In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
@@ -1680,13 +1704,13 @@ public class BigQueryIO {
if (Strings.isNullOrEmpty(table.getProjectId())) {
table.setProjectId(options.getProject());
}
- String jobIdToken = randomUUIDString();
+ String jobIdToken = "beam_job_" + randomUUIDString();
String tempLocation = options.getTempLocation();
String tempFilePrefix;
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
tempFilePrefix = factory.resolve(
- factory.resolve(tempLocation, "BigQuerySinkTemp"),
+ factory.resolve(tempLocation, "BigQueryWriteTemp"),
jobIdToken);
} catch (IOException e) {
throw new RuntimeException(
@@ -1694,16 +1718,120 @@ public class BigQueryIO {
e);
}
- return input.apply("Write", org.apache.beam.sdk.io.Write.to(
- new BigQuerySink(
+ PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+
+ PCollection<TableRow> inputInGlobalWindow =
+ input.apply(
+ Window.<TableRow>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+
+ PCollection<KV<String, Long>> results = inputInGlobalWindow
+ .apply("WriteBundles",
+ ParDo.of(new WriteBundles(tempFilePrefix)));
+
+ TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+ TupleTag<KV<Long, List<String>>> singlePartitionTag =
+ new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+ PCollectionView<Iterable<KV<String, Long>>> resultsView = results
+ .apply("ResultsView", View.<KV<String, Long>>asIterable());
+ PCollectionTuple partitions = singleton.apply(ParDo
+ .of(new WritePartition(
+ resultsView,
+ multiPartitionsTag,
+ singlePartitionTag))
+ .withSideInputs(resultsView)
+ .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+ // Write multiple partitions to separate temporary tables
+ PCollection<String> tempTables = partitions.get(multiPartitionsTag)
+ .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
+ .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+ false,
+ bqServices,
jobIdToken,
- table,
+ tempFilePrefix,
+ toJsonString(table),
jsonSchema,
- getWriteDisposition(),
- getCreateDisposition(),
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED)));
+
+ PCollectionView<Iterable<String>> tempTablesView = tempTables
+ .apply("TempTablesView", View.<String>asIterable());
+ singleton.apply(ParDo
+ .of(new WriteRename(
+ bqServices,
+ jobIdToken,
+ toJsonString(table),
+ writeDisposition,
+ createDisposition,
+ tempTablesView))
+ .withSideInputs(tempTablesView));
+
+ // Write single partition to final table
+ partitions.get(singlePartitionTag)
+ .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
+ .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+ true,
+ bqServices,
+ jobIdToken,
tempFilePrefix,
- input.getCoder(),
- bqServices)));
+ toJsonString(table),
+ jsonSchema,
+ writeDisposition,
+ createDisposition)));
+
+ return PDone.in(input.getPipeline());
+ }
+
+ private class WriteBundles extends OldDoFn<TableRow, KV<String, Long>> {
+ private TableRowWriter writer = null;
+ private final String tempFilePrefix;
+
+ WriteBundles(String tempFilePrefix) {
+ this.tempFilePrefix = tempFilePrefix;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ if (writer == null) {
+ writer = new TableRowWriter(tempFilePrefix);
+ writer.open(UUID.randomUUID().toString());
+ LOG.debug("Done opening writer {}", writer);
+ }
+ try {
+ writer.write(c.element());
+ } catch (Exception e) {
+ // Discard write result and close the write.
+ try {
+ writer.close();
+ // The writer does not need to be reset, as this OldDoFn cannot be reused.
+ } catch (Exception closeException) {
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void finishBundle(Context c) throws Exception {
+ if (writer != null) {
+ c.output(writer.close());
+ writer = null;
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+ .withLabel("Temporary File Prefix"));
+ }
}
@Override
@@ -1784,192 +1912,361 @@ public class BigQueryIO {
}
}
- /** Disallow construction of utility class. */
- private Write() {}
- }
-
- /**
- * {@link BigQuerySink} is implemented as a {@link FileBasedSink}.
- *
- * <p>It uses BigQuery load job to import files into BigQuery.
- */
- static class BigQuerySink extends FileBasedSink<TableRow> {
- private final String jobIdToken;
- @Nullable private final String jsonTable;
- @Nullable private final String jsonSchema;
- private final WriteDisposition writeDisposition;
- private final CreateDisposition createDisposition;
- private final Coder<TableRow> coder;
- private final BigQueryServices bqServices;
-
- public BigQuerySink(
- String jobIdToken,
- @Nullable TableReference table,
- @Nullable String jsonSchema,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- String tempFile,
- Coder<TableRow> coder,
- BigQueryServices bqServices) {
- super(tempFile, ".json");
- this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
- if (table == null) {
- this.jsonTable = null;
- } else {
- checkArgument(!Strings.isNullOrEmpty(table.getProjectId()),
- "Table %s should have a project specified", table);
- this.jsonTable = toJsonString(table);
- }
- this.jsonSchema = jsonSchema;
- this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
- this.createDisposition = checkNotNull(createDisposition, "createDisposition");
- this.coder = checkNotNull(coder, "coder");
- this.bqServices = checkNotNull(bqServices, "bqServices");
- }
-
- @Override
- public FileBasedSink.FileBasedWriteOperation<TableRow> createWriteOperation(
- PipelineOptions options) {
- return new BigQueryWriteOperation(this);
- }
+ static class TableRowWriter {
+ private static final Coder<TableRow> CODER = TableRowJsonCoder.of();
+ private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
+ private final String tempFilePrefix;
+ private String id;
+ private String fileName;
+ private WritableByteChannel channel;
+ protected String mimeType = MimeTypes.TEXT;
+ private CountingOutputStream out;
+
+ TableRowWriter(String basename) {
+ this.tempFilePrefix = basename;
+ }
+
+ public final void open(String uId) throws Exception {
+ id = uId;
+ fileName = tempFilePrefix + id;
+ LOG.debug("Opening {}.", fileName);
+ channel = IOChannelUtils.create(fileName, mimeType);
+ try {
+ out = new CountingOutputStream(Channels.newOutputStream(channel));
+ LOG.debug("Writing header to {}.", fileName);
+ } catch (Exception e) {
+ try {
+ LOG.error("Writing header to {} failed, closing channel.", fileName);
+ channel.close();
+ } catch (IOException closeException) {
+ LOG.error("Closing channel for {} failed", fileName);
+ }
+ throw e;
+ }
+ LOG.debug("Starting write of bundle {} to {}.", this.id, fileName);
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
+ public void write(TableRow value) throws Exception {
+ CODER.encode(value, out, Context.OUTER);
+ out.write(NEWLINE);
+ }
- builder
- .addIfNotNull(DisplayData.item("schema", jsonSchema)
- .withLabel("Table Schema"))
- .addIfNotNull(DisplayData.item("tableSpec", jsonTable)
- .withLabel("Table Specification"));
+ public final KV<String, Long> close() throws IOException {
+ channel.close();
+ return KV.of(fileName, out.getCount());
+ }
}
- private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> {
- // The maximum number of retry load jobs.
- private static final int MAX_RETRY_LOAD_JOBS = 3;
+ /**
+ * Partitions temporary files based on number of files and file sizes.
+ */
+ static class WritePartition extends OldDoFn<String, KV<Long, List<String>>> {
+ private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
+ private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
+ private TupleTag<KV<Long, List<String>>> singlePartitionTag;
- // The maximum number of retries to poll the status of a load job.
- // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
- private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+ public WritePartition(
+ PCollectionView<Iterable<KV<String, Long>>> resultsView,
+ TupleTag<KV<Long, List<String>>> multiPartitionsTag,
+ TupleTag<KV<Long, List<String>>> singlePartitionTag) {
+ this.resultsView = resultsView;
+ this.multiPartitionsTag = multiPartitionsTag;
+ this.singlePartitionTag = singlePartitionTag;
+ }
- private final BigQuerySink bigQuerySink;
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
+ if (results.isEmpty()) {
+ TableRowWriter writer = new TableRowWriter(c.element());
+ writer.open(UUID.randomUUID().toString());
+ results.add(writer.close());
+ }
- private BigQueryWriteOperation(BigQuerySink sink) {
- super(checkNotNull(sink, "sink"));
- this.bigQuerySink = sink;
+ long partitionId = 0;
+ int currNumFiles = 0;
+ long currSizeBytes = 0;
+ List<String> currResults = Lists.newArrayList();
+ for (int i = 0; i < results.size(); ++i) {
+ KV<String, Long> fileResult = results.get(i);
+ if (currNumFiles + 1 > Bound.MAX_NUM_FILES
+ || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) {
+ c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+ currResults = Lists.newArrayList();
+ currNumFiles = 0;
+ currSizeBytes = 0;
+ }
+ ++currNumFiles;
+ currSizeBytes += fileResult.getValue();
+ currResults.add(fileResult.getKey());
+ }
+ if (partitionId == 0) {
+ c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults));
+ } else {
+ c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+ }
}
@Override
- public FileBasedWriter<TableRow> createWriter(PipelineOptions options) throws Exception {
- return new TableRowWriter(this, bigQuerySink.coder);
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ }
+ }
+
+ /**
+ * Writes partitions to BigQuery tables.
+ */
+ static class WriteTables extends OldDoFn<KV<Long, Iterable<List<String>>>, String> {
+ private final boolean singlePartition;
+ private final BigQueryServices bqServices;
+ private final String jobIdToken;
+ private final String tempFilePrefix;
+ private final String jsonTableRef;
+ private final String jsonSchema;
+ private final WriteDisposition writeDisposition;
+ private final CreateDisposition createDisposition;
+
+ public WriteTables(
+ boolean singlePartition,
+ BigQueryServices bqServices,
+ String jobIdToken,
+ String tempFilePrefix,
+ String jsonTableRef,
+ String jsonSchema,
+ WriteDisposition writeDisposition,
+ CreateDisposition createDisposition) {
+ this.singlePartition = singlePartition;
+ this.bqServices = bqServices;
+ this.jobIdToken = jobIdToken;
+ this.tempFilePrefix = tempFilePrefix;
+ this.jsonTableRef = jsonTableRef;
+ this.jsonSchema = jsonSchema;
+ this.writeDisposition = writeDisposition;
+ this.createDisposition = createDisposition;
}
@Override
- public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
- throws IOException, InterruptedException {
- try {
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- List<String> tempFiles = Lists.newArrayList();
- for (FileResult result : writerResults) {
- tempFiles.add(result.getFilename());
- }
- if (!tempFiles.isEmpty()) {
- load(
- bigQuerySink.bqServices.getJobService(bqOptions),
- bigQuerySink.jobIdToken,
- fromJsonString(bigQuerySink.jsonTable, TableReference.class),
- tempFiles,
- fromJsonString(bigQuerySink.jsonSchema, TableSchema.class),
- bigQuerySink.writeDisposition,
- bigQuerySink.createDisposition);
- }
- } finally {
- removeTemporaryFiles(options);
+ public void processElement(ProcessContext c) throws Exception {
+ List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
+ String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
+ TableReference ref = fromJsonString(jsonTableRef, TableReference.class);
+ if (!singlePartition) {
+ ref.setTableId(jobIdPrefix);
}
+
+ load(
+ bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+ jobIdPrefix,
+ ref,
+ fromJsonString(jsonSchema, TableSchema.class),
+ partition,
+ writeDisposition,
+ createDisposition);
+ c.output(toJsonString(ref));
+
+ removeTemporaryFiles(c.getPipelineOptions(), partition);
}
- /**
- * Import files into BigQuery with load jobs.
- *
- * <p>Returns if files are successfully loaded into BigQuery.
- * Throws a RuntimeException if:
- * 1. The status of one load job is UNKNOWN. This is to avoid duplicating data.
- * 2. It exceeds {@code MAX_RETRY_LOAD_JOBS}.
- *
- * <p>If a load job failed, it will try another load job with a different job id.
- */
private void load(
JobService jobService,
String jobIdPrefix,
TableReference ref,
- List<String> gcsUris,
@Nullable TableSchema schema,
+ List<String> gcsUris,
WriteDisposition writeDisposition,
CreateDisposition createDisposition) throws InterruptedException, IOException {
JobConfigurationLoad loadConfig = new JobConfigurationLoad()
- .setSourceUris(gcsUris)
.setDestinationTable(ref)
.setSchema(schema)
+ .setSourceUris(gcsUris)
.setWriteDisposition(writeDisposition.name())
.setCreateDisposition(createDisposition.name())
.setSourceFormat("NEWLINE_DELIMITED_JSON");
- boolean retrying = false;
String projectId = ref.getProjectId();
- for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) {
+ for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
- if (retrying) {
- LOG.info("Previous load jobs failed, retrying.");
- }
- LOG.info("Starting BigQuery load job: {}", jobId);
+ LOG.info("Starting BigQuery load job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS);
JobReference jobRef = new JobReference()
.setProjectId(projectId)
.setJobId(jobId);
jobService.startLoadJob(jobRef, loadConfig);
Status jobStatus =
- parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES));
+ parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
switch (jobStatus) {
case SUCCEEDED:
return;
case UNKNOWN:
- throw new RuntimeException("Failed to poll the load job status.");
+ throw new RuntimeException("Failed to poll the load job status of job " + jobId);
case FAILED:
LOG.info("BigQuery load job failed: {}", jobId);
- retrying = true;
continue;
default:
- throw new IllegalStateException("Unexpected job status: " + jobStatus);
+ throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
+ jobStatus, jobId));
}
}
- throw new RuntimeException(
- "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS);
+ throw new RuntimeException(String.format("Failed to create the load job %s, reached max "
+ + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS));
+ }
+
+ private void removeTemporaryFiles(PipelineOptions options, Collection<String> matches)
+ throws IOException {
+ String pattern = tempFilePrefix + "*";
+ LOG.debug("Finding temporary files matching {}.", pattern);
+ IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
+ if (factory instanceof GcsIOChannelFactory) {
+ GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options);
+ gcsUtil.remove(matches);
+ } else if (factory instanceof FileIOChannelFactory) {
+ for (String filename : matches) {
+ LOG.debug("Removing file {}", filename);
+ boolean exists = Files.deleteIfExists(Paths.get(filename));
+ if (!exists) {
+ LOG.debug("{} does not exist.", filename);
+ }
+ }
+ } else {
+ throw new IOException("Unrecognized file system.");
+ }
}
- }
- private static class TableRowWriter extends FileBasedWriter<TableRow> {
- private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
- private final Coder<TableRow> coder;
- private OutputStream out;
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- public TableRowWriter(
- FileBasedWriteOperation<TableRow> writeOperation, Coder<TableRow> coder) {
- super(writeOperation);
- this.mimeType = MimeTypes.TEXT;
- this.coder = coder;
+ builder
+ .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken)
+ .withLabel("Job ID Token"))
+ .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+ .withLabel("Temporary File Prefix"))
+ .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
+ .withLabel("Table Reference"))
+ .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema)
+ .withLabel("Table Schema"));
+ }
+ }
+
+ /**
+ * Copies temporary tables to destination table.
+ */
+ static class WriteRename extends OldDoFn<String, Void> {
+ private final BigQueryServices bqServices;
+ private final String jobIdToken;
+ private final String jsonTableRef;
+ private final WriteDisposition writeDisposition;
+ private final CreateDisposition createDisposition;
+ private final PCollectionView<Iterable<String>> tempTablesView;
+
+ public WriteRename(
+ BigQueryServices bqServices,
+ String jobIdToken,
+ String jsonTableRef,
+ WriteDisposition writeDisposition,
+ CreateDisposition createDisposition,
+ PCollectionView<Iterable<String>> tempTablesView) {
+ this.bqServices = bqServices;
+ this.jobIdToken = jobIdToken;
+ this.jsonTableRef = jsonTableRef;
+ this.writeDisposition = writeDisposition;
+ this.createDisposition = createDisposition;
+ this.tempTablesView = tempTablesView;
}
@Override
- protected void prepareWrite(WritableByteChannel channel) throws Exception {
- out = Channels.newOutputStream(channel);
+ public void processElement(ProcessContext c) throws Exception {
+ List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
+
+ // Do not copy if not temp tables are provided
+ if (tempTablesJson.size() == 0) {
+ return;
+ }
+
+ List<TableReference> tempTables = Lists.newArrayList();
+ for (String table : tempTablesJson) {
+ tempTables.add(fromJsonString(table, TableReference.class));
+ }
+ copy(
+ bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+ jobIdToken,
+ fromJsonString(jsonTableRef, TableReference.class),
+ tempTables,
+ writeDisposition,
+ createDisposition);
+
+ DatasetService tableService =
+ bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+ removeTemporaryTables(tableService, tempTables);
+ }
+
+ private void copy(
+ JobService jobService,
+ String jobIdPrefix,
+ TableReference ref,
+ List<TableReference> tempTables,
+ WriteDisposition writeDisposition,
+ CreateDisposition createDisposition) throws InterruptedException, IOException {
+ JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy()
+ .setSourceTables(tempTables)
+ .setDestinationTable(ref)
+ .setWriteDisposition(writeDisposition.name())
+ .setCreateDisposition(createDisposition.name());
+
+ String projectId = ref.getProjectId();
+ for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+ String jobId = jobIdPrefix + "-" + i;
+ LOG.info("Starting BigQuery copy job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS);
+ JobReference jobRef = new JobReference()
+ .setProjectId(projectId)
+ .setJobId(jobId);
+ jobService.startCopyJob(jobRef, copyConfig);
+ Status jobStatus =
+ parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
+ switch (jobStatus) {
+ case SUCCEEDED:
+ return;
+ case UNKNOWN:
+ throw new RuntimeException("Failed to poll the copy job status of job " + jobId);
+ case FAILED:
+ LOG.info("BigQuery copy job failed: {}", jobId);
+ continue;
+ default:
+ throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
+ jobStatus, jobId));
+ }
+ }
+ throw new RuntimeException(String.format("Failed to create the copy job %s, reached max "
+ + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS));
+ }
+
+ private void removeTemporaryTables(DatasetService tableService,
+ List<TableReference> tempTables) throws Exception {
+ for (TableReference tableRef : tempTables) {
+ tableService.deleteTable(
+ tableRef.getProjectId(),
+ tableRef.getDatasetId(),
+ tableRef.getTableId());
+ }
}
@Override
- public void write(TableRow value) throws Exception {
- // Use Context.OUTER to encode and NEWLINE as the delimeter.
- coder.encode(value, out, Context.OUTER);
- out.write(NEWLINE);
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken)
+ .withLabel("Job ID Token"))
+ .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
+ .withLabel("Table Reference"))
+ .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+ .withLabel("Write Disposition"))
+ .add(DisplayData.item("createDisposition", createDisposition.toString())
+ .withLabel("Create Disposition"));
}
}
+
+ /** Disallow construction of utility class. */
+ private Write() {}
}
private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
@@ -2093,8 +2390,8 @@ public class BigQueryIO {
TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
- inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
- CreateDisposition.CREATE_IF_NEEDED, tableSchema);
+ inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND,
+ Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema);
createdTables.add(tableSpec);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 29a335d..0af6df8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.Table;
@@ -83,6 +84,12 @@ interface BigQueryServices extends Serializable {
throws IOException, InterruptedException;
/**
+ * Start a BigQuery copy job.
+ */
+ void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
+ throws IOException, InterruptedException;
+
+ /**
* Waits for the job is Done, and returns the job.
*
* <p>Returns null if the {@code maxAttempts} retries reached.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index ef17e0f..bd1097f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -39,6 +39,7 @@ import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
@@ -124,9 +125,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC .
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
*/
@Override
public void startLoadJob(
@@ -142,9 +143,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC .
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
*/
@Override
public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
@@ -160,9 +161,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC .
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
*/
@Override
public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
@@ -175,6 +176,24 @@ class BigQueryServicesImpl implements BigQueryServices {
startJob(job, errorExtractor, client);
}
+ /**
+ * {@inheritDoc}
+ *
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+ */
+ @Override
+ public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
+ throws IOException, InterruptedException {
+ Job job = new Job()
+ .setJobReference(jobRef)
+ .setConfiguration(
+ new JobConfiguration().setCopy(copyConfig));
+
+ startJob(job, errorExtractor, client);
+ }
+
private static void startJob(Job job,
ApiErrorExtractor errorExtractor,
Bigquery client) throws IOException, InterruptedException {
@@ -320,9 +339,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC .
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
*/
@Override
public Table getTable(String projectId, String datasetId, String tableId)
@@ -341,9 +360,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC .
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
*/
@Override
public void deleteTable(String projectId, String datasetId, String tableId)
@@ -377,9 +396,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC .
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
*/
@Override
public Dataset getDataset(String projectId, String datasetId)
@@ -398,9 +417,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC .
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
*/
@Override
public void createDataset(
@@ -456,9 +475,9 @@ class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC .
+ * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
*/
@Override
public void deleteDataset(String projectId, String datasetId)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 7d2df62..1ea1f94 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,14 +26,17 @@ import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource;
@@ -44,6 +47,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WritePartition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTables;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.BigQueryOptions;
@@ -58,16 +64,23 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
import com.google.api.client.util.Data;
import com.google.api.client.util.Strings;
@@ -76,6 +89,7 @@ import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatistics2;
@@ -110,6 +124,9 @@ import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -123,6 +140,8 @@ import javax.annotation.Nullable;
@RunWith(JUnit4.class)
public class BigQueryIOTest implements Serializable {
+ @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
// Status.UNKNOWN maps to null
private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of(
Status.SUCCEEDED, new Job().setStatus(new JobStatus()),
@@ -275,6 +294,12 @@ public class BigQueryIOTest implements Serializable {
}
@Override
+ public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
+ throws IOException, InterruptedException {
+ startJob(jobRef);
+ }
+
+ @Override
public Job pollJob(JobReference jobRef, int maxAttempts)
throws InterruptedException {
if (!Strings.isNullOrEmpty(executingProject)) {
@@ -565,7 +590,8 @@ public class BigQueryIOTest implements Serializable {
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService()
.startJobReturns("done", "done", "done")
- .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED));
+ .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED))
+ .withDatasetService(mockDatasetService);
Pipeline p = TestPipeline.create(bqOptions);
p.apply(Create.of(
@@ -584,7 +610,6 @@ public class BigQueryIOTest implements Serializable {
p.run();
logged.verifyInfo("Starting BigQuery load job");
- logged.verifyInfo("Previous load jobs failed, retrying.");
File tempDir = new File(bqOptions.getTempLocation());
assertEquals(0, tempDir.listFiles(new FileFilter() {
@Override
@@ -613,7 +638,7 @@ public class BigQueryIOTest implements Serializable {
.withoutValidation());
thrown.expect(RuntimeException.class);
- thrown.expectMessage("Failed to poll the load job status.");
+ thrown.expectMessage("Failed to poll the load job status");
p.run();
File tempDir = new File(bqOptions.getTempLocation());
@@ -1228,4 +1253,186 @@ public class BigQueryIOTest implements Serializable {
p.run();
}
+
+ @Test
+ public void testWritePartitionEmptyData() throws Exception {
+ final long numFiles = 0;
+ final long fileSize = 0;
+
+ // An empty file is created for no input data. One partition is needed.
+ final long expectedNumPartitions = 1;
+ testWritePartition(numFiles, fileSize, expectedNumPartitions);
+ }
+
+ @Test
+ public void testWritePartitionSinglePartition() throws Exception {
+ final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES;
+ final long fileSize = 1;
+
+ // One partition is needed.
+ final long expectedNumPartitions = 1;
+ testWritePartition(numFiles, fileSize, expectedNumPartitions);
+ }
+
+ @Test
+ public void testWritePartitionManyFiles() throws Exception {
+ final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3;
+ final long fileSize = 1;
+
+ // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files.
+ final long expectedNumPartitions = 3;
+ testWritePartition(numFiles, fileSize, expectedNumPartitions);
+ }
+
+ @Test
+ public void testWritePartitionLargeFileSize() throws Exception {
+ final long numFiles = 10;
+ final long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3;
+
+ // One partition is needed for each group of three files.
+ final long expectedNumPartitions = 4;
+ testWritePartition(numFiles, fileSize, expectedNumPartitions);
+ }
+
+ private void testWritePartition(long numFiles, long fileSize, long expectedNumPartitions)
+ throws Exception {
+ final List<Long> expectedPartitionIds = Lists.newArrayList();
+ for (long i = 1; i <= expectedNumPartitions; ++i) {
+ expectedPartitionIds.add(i);
+ }
+
+ final List<KV<String, Long>> files = Lists.newArrayList();
+ final List<String> fileNames = Lists.newArrayList();
+ for (int i = 0; i < numFiles; ++i) {
+ String fileName = String.format("files%05d", i);
+ fileNames.add(fileName);
+ files.add(KV.of(fileName, fileSize));
+ }
+
+ TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+ TupleTag<KV<Long, List<String>>> singlePartitionTag =
+ new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+ final PCollectionView<Iterable<KV<String, Long>>> filesView = PCollectionViews.iterableView(
+ TestPipeline.create(),
+ WindowingStrategy.globalDefault(),
+ KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
+
+ WritePartition writePartition =
+ new WritePartition(filesView, multiPartitionsTag, singlePartitionTag);
+
+ DoFnTester<String, KV<Long, List<String>>> tester = DoFnTester.of(writePartition);
+ tester.setSideInput(filesView, GlobalWindow.INSTANCE, files);
+ tester.processElement(tmpFolder.getRoot().getAbsolutePath());
+
+ List<KV<Long, List<String>>> partitions;
+ if (expectedNumPartitions > 1) {
+ partitions = tester.takeSideOutputElements(multiPartitionsTag);
+ } else {
+ partitions = tester.takeSideOutputElements(singlePartitionTag);
+ }
+ List<Long> partitionIds = Lists.newArrayList();
+ List<String> partitionFileNames = Lists.newArrayList();
+ for (KV<Long, List<String>> partition : partitions) {
+ partitionIds.add(partition.getKey());
+ for (String name : partition.getValue()) {
+ partitionFileNames.add(name);
+ }
+ }
+
+ assertEquals(expectedPartitionIds, partitionIds);
+ if (numFiles == 0) {
+ assertThat(partitionFileNames, Matchers.hasSize(1));
+ assertTrue(Files.exists(Paths.get(partitionFileNames.get(0))));
+ assertThat(Files.readAllBytes(Paths.get(partitionFileNames.get(0))).length,
+ Matchers.equalTo(0));
+ } else {
+ assertEquals(fileNames, partitionFileNames);
+ }
+ }
+
+ @Test
+ public void testWriteTables() throws Exception {
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(new FakeJobService()
+ .startJobReturns("done", "done", "done", "done")
+ .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED))
+ .withDatasetService(mockDatasetService);
+
+ final long numPartitions = 3;
+ final long numFilesPerPartition = 10;
+ final String jobIdToken = "jobIdToken";
+ final String tempFilePrefix = "tempFilePrefix";
+ final String jsonTable = "{}";
+ final String jsonSchema = "{}";
+ final List<String> expectedTempTables = Lists.newArrayList();
+
+ final List<KV<Long, Iterable<List<String>>>> partitions = Lists.newArrayList();
+ for (long i = 0; i < numPartitions; ++i) {
+ List<String> filesPerPartition = Lists.newArrayList();
+ for (int j = 0; j < numFilesPerPartition; ++j) {
+ filesPerPartition.add(String.format("files%05d", j));
+ }
+ partitions.add(KV.of(i, (Iterable<List<String>>) Collections.singleton(filesPerPartition)));
+ expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i));
+ }
+
+ WriteTables writeTables = new WriteTables(
+ false,
+ fakeBqServices,
+ jobIdToken,
+ tempFilePrefix,
+ jsonTable,
+ jsonSchema,
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED);
+
+ DoFnTester<KV<Long, Iterable<List<String>>>, String> tester = DoFnTester.of(writeTables);
+ for (KV<Long, Iterable<List<String>>> partition : partitions) {
+ tester.processElement(partition);
+ }
+
+ List<String> tempTables = tester.takeOutputElements();
+
+ logged.verifyInfo("Starting BigQuery load job");
+
+ assertEquals(expectedTempTables, tempTables);
+ }
+
+ @Test
+ public void testWriteRename() throws Exception {
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(new FakeJobService()
+ .startJobReturns("done", "done")
+ .pollJobReturns(Status.FAILED, Status.SUCCEEDED))
+ .withDatasetService(mockDatasetService);
+
+ final long numTempTables = 3;
+ final String jobIdToken = "jobIdToken";
+ final String jsonTable = "{}";
+ final List<String> tempTables = Lists.newArrayList();
+ for (long i = 0; i < numTempTables; ++i) {
+ tempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i));
+ }
+
+ final PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView(
+ TestPipeline.create(),
+ WindowingStrategy.globalDefault(),
+ StringUtf8Coder.of());
+
+ WriteRename writeRename = new WriteRename(
+ fakeBqServices,
+ jobIdToken,
+ jsonTable,
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED,
+ tempTablesView);
+
+ DoFnTester<String, Void> tester = DoFnTester.of(writeRename);
+ tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
+ tester.processElement(null);
+
+ logged.verifyInfo("Starting BigQuery copy job");
+ }
}
[2/2] incubator-beam git commit: Closes #707
Posted by dh...@apache.org.
Closes #707
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34d50127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34d50127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34d50127
Branch: refs/heads/master
Commit: 34d501278344e90115c4baea3af6301c37f58972
Parents: 595d2d4 8db6114
Author: Dan Halperin <dh...@google.com>
Authored: Wed Aug 3 23:41:22 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 23:41:22 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 585 ++++++++++++++-----
.../sdk/io/gcp/bigquery/BigQueryServices.java | 7 +
.../io/gcp/bigquery/BigQueryServicesImpl.java | 51 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 213 ++++++-
4 files changed, 693 insertions(+), 163 deletions(-)
----------------------------------------------------------------------