You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:46 UTC
[12/50] [abbrv] beam git commit: Refactor batch load job path,
and add support for data-dependent tables.
Refactor batch load job path, and add support for data-dependent tables.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8581caf3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8581caf3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8581caf3
Branch: refs/heads/DSL_SQL
Commit: 8581caf388ad688a0e79cfa154262d1e701dee10
Parents: 58ed5c7
Author: Reuven Lax <re...@google.com>
Authored: Wed Mar 29 07:34:10 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 180 ----------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 203 +++++++++++++++++++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 3 +-
.../sdk/io/gcp/bigquery/TableDestination.java | 17 +-
.../sdk/io/gcp/bigquery/TableRowWriter.java | 12 +-
.../beam/sdk/io/gcp/bigquery/WriteBundles.java | 82 --------
.../io/gcp/bigquery/WriteBundlesToFiles.java | 102 ++++++++++
.../sdk/io/gcp/bigquery/WritePartition.java | 95 ++++++---
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 63 +++---
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 47 ++---
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 27 +--
11 files changed, 469 insertions(+), 362 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
deleted file mode 100644
index 160b231..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
- */
-class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> {
- BigQueryIO.Write<T> write;
-
- BatchLoadBigQuery(BigQueryIO.Write<T> write) {
- this.write = write;
- }
-
- @Override
- public WriteResult expand(PCollection<T> input) {
- Pipeline p = input.getPipeline();
- BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
- ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
-
- final String stepUuid = BigQueryHelpers.randomUUIDString();
-
- String tempLocation = options.getTempLocation();
- String tempFilePrefix;
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- tempFilePrefix = factory.resolve(
- factory.resolve(tempLocation, "BigQueryWriteTemp"),
- stepUuid);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
- e);
- }
-
- // Create a singleton job ID token at execution time.
- PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
- PCollectionView<String> jobIdTokenView = p
- .apply("TriggerIdCreation", Create.of("ignored"))
- .apply("CreateJobId", MapElements.via(
- new SimpleFunction<String, String>() {
- @Override
- public String apply(String input) {
- return stepUuid;
- }
- }))
- .apply(View.<String>asSingleton());
-
- PCollection<T> typedInputInGlobalWindow =
- input.apply(
- Window.<T>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
- // Avoid applying the formatFunction if it is the identity formatter.
- PCollection<TableRow> inputInGlobalWindow;
- if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) {
- inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
- } else {
- inputInGlobalWindow =
- typedInputInGlobalWindow.apply(
- MapElements.into(new TypeDescriptor<TableRow>() {}).via(write.getFormatFunction()));
- }
-
- // PCollection of filename, file byte size.
- PCollection<KV<String, Long>> results = inputInGlobalWindow
- .apply("WriteBundles",
- ParDo.of(new WriteBundles(tempFilePrefix)));
-
- TupleTag<KV<Long, List<String>>> multiPartitionsTag =
- new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
- TupleTag<KV<Long, List<String>>> singlePartitionTag =
- new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
- // Turn the list of files and record counts in a PCollectionView that can be used as a
- // side input.
- PCollectionView<Iterable<KV<String, Long>>> resultsView = results
- .apply("ResultsView", View.<KV<String, Long>>asIterable());
- PCollectionTuple partitions = singleton.apply(ParDo
- .of(new WritePartition(
- resultsView,
- multiPartitionsTag,
- singlePartitionTag))
- .withSideInputs(resultsView)
- .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
- // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
- // the import needs to be split into multiple partitions, and those partitions will be
- // specified in multiPartitionsTag.
- PCollection<String> tempTables = partitions.get(multiPartitionsTag)
- .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
- .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
- false,
- write.getBigQueryServices(),
- jobIdTokenView,
- tempFilePrefix,
- NestedValueProvider.of(table, new TableRefToJson()),
- write.getJsonSchema(),
- WriteDisposition.WRITE_EMPTY,
- CreateDisposition.CREATE_IF_NEEDED,
- write.getTableDescription()))
- .withSideInputs(jobIdTokenView));
-
- PCollectionView<Iterable<String>> tempTablesView = tempTables
- .apply("TempTablesView", View.<String>asIterable());
- singleton.apply(ParDo
- .of(new WriteRename(
- write.getBigQueryServices(),
- jobIdTokenView,
- NestedValueProvider.of(table, new TableRefToJson()),
- write.getWriteDisposition(),
- write.getCreateDisposition(),
- tempTablesView,
- write.getTableDescription()))
- .withSideInputs(tempTablesView, jobIdTokenView));
-
- // Write single partition to final table
- partitions.get(singlePartitionTag)
- .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
- .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
- true,
- write.getBigQueryServices(),
- jobIdTokenView,
- tempFilePrefix,
- NestedValueProvider.of(table, new TableRefToJson()),
- write.getJsonSchema(),
- write.getWriteDisposition(),
- write.getCreateDisposition(),
- write.getTableDescription()))
- .withSideInputs(jobIdTokenView));
-
- return WriteResult.in(input.getPipeline());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
new file mode 100644
index 0000000..8594211
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+
+/**
+ * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
+ */
+class BatchLoads<T> extends
+ PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+ BigQueryIO.Write<T> write;
+
+ private static class ConstantSchemaFunction implements
+ SerializableFunction<TableDestination, TableSchema> {
+ private final @Nullable
+ String jsonSchema;
+
+ ConstantSchemaFunction(TableSchema schema) {
+ this.jsonSchema = BigQueryHelpers.toJsonString(schema);
+ }
+
+ @Override
+ @Nullable
+ public TableSchema apply(TableDestination table) {
+ return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+ }
+ }
+
+ BatchLoads(BigQueryIO.Write<T> write) {
+ this.write = write;
+ }
+
+ @Override
+ public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
+ Pipeline p = input.getPipeline();
+ BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
+ ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
+
+ final String stepUuid = BigQueryHelpers.randomUUIDString();
+
+ String tempLocation = options.getTempLocation();
+ String tempFilePrefix;
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ tempFilePrefix = factory.resolve(
+ factory.resolve(tempLocation, "BigQueryWriteTemp"),
+ stepUuid);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+ e);
+ }
+
+ // Create a singleton job ID token at execution time. This will be used as the base for all
+ // load jobs issued from this instance of the transfomr.
+ PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+ PCollectionView<String> jobIdTokenView = p
+ .apply("TriggerIdCreation", Create.of("ignored"))
+ .apply("CreateJobId", MapElements.via(
+ new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return stepUuid;
+ }
+ }))
+ .apply(View.<String>asSingleton());
+
+ PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow =
+ input.apply(
+ Window.<KV<TableDestination, TableRow>>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+
+ // PCollection of filename, file byte size, and table destination.
+ PCollection<WriteBundlesToFiles.Result> results = inputInGlobalWindow
+ .apply("WriteBundlesToFiles",
+ ParDo.of(new WriteBundlesToFiles(tempFilePrefix)));
+
+ TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {};
+ TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag =
+ new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {};
+
+ // Turn the list of files and record counts in a PCollectionView that can be used as a
+ // side input.
+ PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = results
+ .apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable());
+ // This transform will look at the set of files written for each table, and if any table has
+ // too many files or bytes, will partition that table's files into multiple partitions for
+ // loading.
+ PCollectionTuple partitions = singleton.apply(ParDo
+ .of(new WritePartition(
+ write.getTable(),
+ write.getTableDescription(),
+ resultsView,
+ multiPartitionsTag,
+ singlePartitionTag))
+ .withSideInputs(resultsView)
+ .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+ // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
+ // schema function here. If no schema is specified, this function will return null.
+ SerializableFunction<TableDestination, TableSchema> schemaFunction =
+ new ConstantSchemaFunction(write.getSchema());
+
+ // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
+ // the import needs to be split into multiple partitions, and those partitions will be
+ // specified in multiPartitionsTag.
+ PCollection<KV<TableDestination, String>> tempTables = partitions.get(multiPartitionsTag)
+ // What's this GroupByKey for? Is this so we have a deterministic temp tables? If so, maybe
+ // Reshuffle is better here.
+ .apply("MultiPartitionsGroupByKey",
+ GroupByKey.<KV<TableDestination, Integer>, List<String>>create())
+ .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+ false,
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ tempFilePrefix,
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED,
+ schemaFunction))
+ .withSideInputs(jobIdTokenView));
+
+ // This view maps each final table destination to the set of temporary partitioned tables
+ // the PCollection was loaded into.
+ PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = tempTables
+ .apply("TempTablesView", View.<TableDestination, String>asMultimap());
+
+ singleton.apply(ParDo
+ .of(new WriteRename(
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ write.getWriteDisposition(),
+ write.getCreateDisposition(),
+ tempTablesView,
+ write.getTableDescription()))
+ .withSideInputs(tempTablesView, jobIdTokenView));
+
+ // Write single partition to final table
+ partitions.get(singlePartitionTag)
+ .apply("SinglePartitionGroupByKey",
+ GroupByKey.<KV<TableDestination, Integer>, List<String>>create())
+ .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+ true,
+ write.getBigQueryServices(),
+ jobIdTokenView,
+ tempFilePrefix,
+ write.getWriteDisposition(),
+ write.getCreateDisposition(),
+ schemaFunction))
+ .withSideInputs(jobIdTokenView));
+
+ return WriteResult.in(input.getPipeline());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index af19b83..f1baaf7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -984,7 +984,8 @@ public class BigQueryIO {
if (input.isBounded() == IsBounded.UNBOUNDED) {
return rowsWithDestination.apply(new StreamingInserts(this));
} else {
- return input.apply(new BatchLoadBigQuery<T>(this));
+
+ return rowsWithDestination.apply(new BatchLoads<T>(this));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 631afeb..1c2b256 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableReference;
+import java.util.Objects;
+
/**
* Encapsulates a BigQuery table destination.
*/
@@ -42,7 +44,6 @@ public class TableDestination {
return tableSpec;
}
-
public TableReference getTableReference() {
return BigQueryHelpers.parseTableSpec(tableSpec);
}
@@ -50,4 +51,18 @@ public class TableDestination {
public String getTableDescription() {
return tableDescription;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TableDestination)) {
+ return false;
+ }
+ TableDestination other = (TableDestination) o;
+ return tableSpec == other.tableSpec && tableDescription == other.tableDescription;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableSpec, tableDescription);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index 014c498..a1f6153 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -48,6 +48,14 @@ class TableRowWriter {
protected String mimeType = MimeTypes.TEXT;
private CountingOutputStream out;
+ public class Result {
+ String filename;
+ long byteSize;
+ public Result(String filename, long byteSize) {
+ this.filename = filename;
+ this.byteSize = byteSize;
+ }
+ }
TableRowWriter(String basename) {
this.tempFilePrefix = basename;
}
@@ -77,8 +85,8 @@ class TableRowWriter {
out.write(NEWLINE);
}
- public final KV<String, Long> close() throws IOException {
+ public final Result close() throws IOException {
channel.close();
- return KV.of(fileName, out.getCount());
+ return new Result(fileName, out.getCount());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
deleted file mode 100644
index 6219226..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableRow;
-import java.util.UUID;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Writes each bundle of {@link TableRow} elements out to a separate file using
- * {@link TableRowWriter}.
- */
-class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
- private static final Logger LOG = LoggerFactory.getLogger(WriteBundles.class);
-
- private transient TableRowWriter writer = null;
- private final String tempFilePrefix;
-
- WriteBundles(String tempFilePrefix) {
- this.tempFilePrefix = tempFilePrefix;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- if (writer == null) {
- writer = new TableRowWriter(tempFilePrefix);
- writer.open(UUID.randomUUID().toString());
- LOG.debug("Done opening writer {}", writer);
- }
- try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
- }
- }
-
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- if (writer != null) {
- c.output(writer.close());
- writer = null;
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder
- .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
- .withLabel("Temporary File Prefix"));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
new file mode 100644
index 0000000..4e6167b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.Maps;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes each bundle of {@link TableRow} elements out to a separate file using
+ * {@link TableRowWriter}.
+ */
+class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBundlesToFiles.Result> {
+ private static final Logger LOG = LoggerFactory.getLogger(WriteBundlesToFiles.class);
+
+ // Map from tablespec to a writer for that table.
+ private transient Map<TableDestination, TableRowWriter> writers;
+ private final String tempFilePrefix;
+
+ public static class Result {
+ public String filename;
+ public Long fileByteSize;
+ public TableDestination tableDestination;
+
+ public Result(String filename, Long fileByteSize, TableDestination tableDestination) {
+ this.filename = filename;
+ this.fileByteSize = fileByteSize;
+ this.tableDestination = tableDestination;
+ }
+ }
+ WriteBundlesToFiles(String tempFilePrefix) {
+ this.tempFilePrefix = tempFilePrefix;
+ this.writers = Maps.newHashMap();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ // ??? can we assume Java8?
+ TableRowWriter writer = writers.getOrDefault(c.element().getKey(), null);
+ if (writer == null) {
+ writer = new TableRowWriter(tempFilePrefix);
+ writer.open(UUID.randomUUID().toString());
+ writers.put(c.element().getKey(), writer);
+ LOG.debug("Done opening writer {}", writer);
+ }
+ try {
+ writer.write(c.element().getValue());
+ } catch (Exception e) {
+ // Discard write result and close the write.
+ try {
+ writer.close();
+ // The writer does not need to be reset, as this DoFn cannot be reused.
+ } catch (Exception closeException) {
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
+ }
+ throw e;
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ for (Map.Entry<TableDestination, TableRowWriter> entry : writers.entrySet()) {
+ TableRowWriter.Result result = entry.getValue().close();
+ c.output(new Result(result.filename, result.byteSize, entry.getKey()));
+ }
+ writers.clear();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+ .withLabel("Temporary File Prefix"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 1b6492e..8e1b16d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -18,27 +18,40 @@
package org.apache.beam.sdk.io.gcp.bigquery;
+import com.google.api.services.bigquery.model.TableReference;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
/**
- * Partitions temporary files based on number of files and file sizes.
+ * Partitions temporary files based on number of files and file sizes. Output key is a pair of
+ * tablespec and the list of files corresponding to each partition of that table.
*/
-class WritePartition extends DoFn<String, KV<Long, List<String>>> {
- private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
- private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
- private TupleTag<KV<Long, List<String>>> singlePartitionTag;
+class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List<String>>> {
+ private final ValueProvider<TableReference> singletonOutputTable;
+ private final String singletonOutputTableDescription;
+ private final PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView;
+ private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag;
+ private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag;
public WritePartition(
- PCollectionView<Iterable<KV<String, Long>>> resultsView,
- TupleTag<KV<Long, List<String>>> multiPartitionsTag,
- TupleTag<KV<Long, List<String>>> singlePartitionTag) {
+ ValueProvider<TableReference> singletonOutputTable,
+ String singletonOutputTableDescription,
+ PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView,
+ TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag,
+ TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag) {
+ this.singletonOutputTable = singletonOutputTable;
+ this.singletonOutputTableDescription = singletonOutputTableDescription;
this.resultsView = resultsView;
this.multiPartitionsTag = multiPartitionsTag;
this.singlePartitionTag = singlePartitionTag;
@@ -46,34 +59,62 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
- if (results.isEmpty()) {
- TableRowWriter writer = new TableRowWriter(c.element());
- writer.open(UUID.randomUUID().toString());
- results.add(writer.close());
+ List<WriteBundlesToFiles.Result> results = Lists.newArrayList(c.sideInput(resultsView));
+
+ // If there are no elements to write _and_ the user specified a constant output table, then
+ // generate an empty table of that name.
+ if (results.isEmpty() && singletonOutputTable != null) {
+ TableReference singletonTable = singletonOutputTable.get();
+ if (singletonTable != null) {
+ TableRowWriter writer = new TableRowWriter(c.element());
+ writer.open(UUID.randomUUID().toString());
+ TableRowWriter.Result writerResult = writer.close();
+ results.add(new Result(writerResult.filename, writerResult.byteSize,
+ new TableDestination(singletonTable, singletonOutputTableDescription)));
+ }
}
+
long partitionId = 0;
- int currNumFiles = 0;
- long currSizeBytes = 0;
- List<String> currResults = Lists.newArrayList();
+ Map<TableDestination, Integer> currNumFilesMap = Maps.newHashMap();
+ Map<TableDestination, Long> currSizeBytesMap = Maps.newHashMap();
+ Map<TableDestination, List<List<String>>> currResultsMap = Maps.newHashMap();
for (int i = 0; i < results.size(); ++i) {
- KV<String, Long> fileResult = results.get(i);
+ WriteBundlesToFiles.Result fileResult = results.get(i);
+ TableDestination tableDestination = fileResult.tableDestination;
+ // JAVA8
+ List<List<String>> partitions = currResultsMap.getOrDefault(tableDestination, null);
+ if (partitions == null) {
+ partitions = Lists.newArrayList();
+ partitions.add(Lists.<String>newArrayList());
+ currResultsMap.put(tableDestination, partitions);
+ }
+ int currNumFiles = currNumFilesMap.getOrDefault(tableDestination, 0);
+ long currSizeBytes = currSizeBytesMap.getOrDefault(tableDestination, 0L);
if (currNumFiles + 1 > Write.MAX_NUM_FILES
- || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
- c.output(multiPartitionsTag, KV.of(++partitionId, currResults));
- currResults = Lists.newArrayList();
+ || currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) {
+ // Add a new partition for this table.
+ partitions.add(Lists.<String>newArrayList());
+ // c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
currNumFiles = 0;
currSizeBytes = 0;
+ currNumFilesMap.remove(tableDestination);
+ currSizeBytesMap.remove(tableDestination);
}
- ++currNumFiles;
- currSizeBytes += fileResult.getValue();
- currResults.add(fileResult.getKey());
+ currNumFilesMap.put(tableDestination, currNumFiles + 1);
+ currSizeBytesMap.put(tableDestination, currSizeBytes + fileResult.fileByteSize);
+ // Always add to the most recent partition for this table.
+ partitions.get(partitions.size() - 1).add(fileResult.filename);
}
- if (partitionId == 0) {
- c.output(singlePartitionTag, KV.of(++partitionId, currResults));
- } else {
- c.output(multiPartitionsTag, KV.of(++partitionId, currResults));
+
+ for (Map.Entry<TableDestination, List<List<String>>> entry : currResultsMap.entrySet()) {
+ TableDestination tableDestination = entry.getKey();
+ List<List<String>> partitions = entry.getValue();
+ TupleTag<KV<KV<TableDestination, Integer>, List<String>>> outputTag =
+ (partitions.size() == 1) ? singlePartitionTag : multiPartitionsTag;
+ for (int i = 0; i < partitions.size(); ++i) {
+ c.output(outputTag, KV.of(KV.of(tableDestination, i + 1), partitions.get(i)));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 8cb9439..fbfb290 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
+import avro.shaded.com.google.common.collect.Maps;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
@@ -25,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
@@ -49,24 +51,21 @@ class WriteRename extends DoFn<String, Void> {
private final BigQueryServices bqServices;
private final PCollectionView<String> jobIdToken;
- private final ValueProvider<String> jsonTableRef;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
- private final PCollectionView<Iterable<String>> tempTablesView;
+ private final PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView;
@Nullable
private final String tableDescription;
public WriteRename(
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
- ValueProvider<String> jsonTableRef,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
- PCollectionView<Iterable<String>> tempTablesView,
+ PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView,
@Nullable String tableDescription) {
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
- this.jsonTableRef = jsonTableRef;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.tempTablesView = tempTablesView;
@@ -75,30 +74,40 @@ class WriteRename extends DoFn<String, Void> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
+ Map<TableDestination, Iterable<String>> tempTablesMap =
+ Maps.newHashMap(c.sideInput(tempTablesView));
- // Do not copy if no temp tables are provided
- if (tempTablesJson.size() == 0) {
- return;
- }
+ // Process each destination table.
+ for (Map.Entry<TableDestination, Iterable<String>> entry : tempTablesMap.entrySet()) {
+ TableDestination finalTableDestination = entry.getKey();
+ List<String> tempTablesJson = Lists.newArrayList(entry.getValue());
+ // Do not copy if no temp tables are provided
+ if (tempTablesJson.size() == 0) {
+ return;
+ }
+
+ List<TableReference> tempTables = Lists.newArrayList();
+ for (String table : tempTablesJson) {
+ tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
+ }
+
+ // Make sure each destination table gets a unique job id.
+ String jobIdPrefix = String.format(
+ c.sideInput(jobIdToken) + "0x%08x", finalTableDestination.hashCode());
+ copy(
+ bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+ bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+ jobIdPrefix,
+ finalTableDestination.getTableReference(),
+ tempTables,
+ writeDisposition,
+ createDisposition,
+ tableDescription);
- List<TableReference> tempTables = Lists.newArrayList();
- for (String table : tempTablesJson) {
- tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
+ DatasetService tableService =
+ bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+ removeTemporaryTables(tableService, tempTables);
}
- copy(
- bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
- bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
- c.sideInput(jobIdToken),
- BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class),
- tempTables,
- writeDisposition,
- createDisposition,
- tableDescription);
-
- DatasetService tableService =
- bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
- removeTemporaryTables(tableService, tempTables);
}
private void copy(
@@ -170,8 +179,6 @@ class WriteRename extends DoFn<String, Void> {
super.populateDisplayData(builder);
builder
- .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
- .withLabel("Table Reference"))
.add(DisplayData.item("writeDisposition", writeDisposition.toString())
.withLabel("Write Disposition"))
.add(DisplayData.item("createDisposition", createDisposition.toString())
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 29680ad..5051c95 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.FileIOChannelFactory;
import org.apache.beam.sdk.util.GcsIOChannelFactory;
@@ -57,48 +58,45 @@ import org.slf4j.LoggerFactory;
/**
* Writes partitions to BigQuery tables.
*/
-class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
+class WriteTables extends DoFn<KV<KV<TableDestination, Integer>, Iterable<List<String>>>,
+ KV<TableDestination, String>> {
private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
private final boolean singlePartition;
private final BigQueryServices bqServices;
private final PCollectionView<String> jobIdToken;
private final String tempFilePrefix;
- private final ValueProvider<String> jsonTableRef;
- private final ValueProvider<String> jsonSchema;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
- @Nullable
- private final String tableDescription;
+ private final SerializableFunction<TableDestination, TableSchema> schemaFunction;
public WriteTables(
boolean singlePartition,
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
String tempFilePrefix,
- ValueProvider<String> jsonTableRef,
- ValueProvider<String> jsonSchema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
- @Nullable String tableDescription) {
+ SerializableFunction<TableDestination, TableSchema> schemaFunction) {
this.singlePartition = singlePartition;
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.tempFilePrefix = tempFilePrefix;
- this.jsonTableRef = jsonTableRef;
- this.jsonSchema = jsonSchema;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
- this.tableDescription = tableDescription;
+ this.schemaFunction = schemaFunction;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
+ TableDestination tableDestination = c.element().getKey().getKey();
+ Integer partition = c.element().getKey().getValue();
+ List<String> partitionFiles = Lists.newArrayList(c.element().getValue()).get(0);
+ // Job ID must be different for each partition of each table.
String jobIdPrefix = String.format(
- c.sideInput(jobIdToken) + "_%05d", c.element().getKey());
- TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(),
- TableReference.class);
+ c.sideInput(jobIdToken) + "0x%08x_%05d", tableDestination.hashCode(), partition);
+
+ TableReference ref = tableDestination.getTableReference();
if (!singlePartition) {
ref.setTableId(jobIdPrefix);
}
@@ -108,15 +106,14 @@ class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
jobIdPrefix,
ref,
- BigQueryHelpers.fromJsonString(
- jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
- partition,
+ schemaFunction.apply(tableDestination),
+ partitionFiles,
writeDisposition,
createDisposition,
- tableDescription);
- c.output(BigQueryHelpers.toJsonString(ref));
+ tableDestination.getTableDescription());
+ c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(ref)));
- removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition);
+ removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partitionFiles);
}
private void load(
@@ -202,12 +199,6 @@ class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
builder
.addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
- .withLabel("Temporary File Prefix"))
- .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
- .withLabel("Table Reference"))
- .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema)
- .withLabel("Table Schema"))
- .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
- .withLabel("Table Description"));
+ .withLabel("Temporary File Prefix"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d953edd..af39483 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -2078,26 +2078,27 @@ public class BigQueryIOTest implements Serializable {
files.add(KV.of(fileName, fileSize));
}
- TupleTag<KV<Long, List<String>>> multiPartitionsTag =
- new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
- TupleTag<KV<Long, List<String>>> singlePartitionTag =
- new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
- PCollection<KV<String, Long>> filesPCollection =
- p.apply(Create.of(files).withType(new TypeDescriptor<KV<String, Long>>() {}));
- PCollectionView<Iterable<KV<String, Long>>> filesView = PCollectionViews.iterableView(
- filesPCollection,
+ TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {};
+ TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag =
+ new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {};
+
+ PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
+ PCollectionViews.iterableView(
+ p,
WindowingStrategy.globalDefault(),
KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
WritePartition writePartition =
- new WritePartition(filesView, multiPartitionsTag, singlePartitionTag);
+ new WritePartition(null, null, resultsView,
+ multiPartitionsTag, singlePartitionTag);
- DoFnTester<String, KV<Long, List<String>>> tester = DoFnTester.of(writePartition);
- tester.setSideInput(filesView, GlobalWindow.INSTANCE, files);
+ DoFnTester<String, KV<KV<TableDestination, Integer>, List<String>>> tester =
+ DoFnTester.of(writePartition);
+ tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files);
tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
- List<KV<Long, List<String>>> partitions;
+ List<KV<KV<TableDestination, Integer>, List<String>>> partitions;
if (expectedNumPartitions > 1) {
partitions = tester.takeOutputElements(multiPartitionsTag);
} else {