You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/24 22:48:13 UTC
[1/2] beam git commit: [BEAM-1071] Allow for BigQueryIO to write
tables with CREATE_NEVER disposition
Repository: beam
Updated Branches:
refs/heads/master 11c3cd70b -> f2389ab7b
[BEAM-1071] Allow for BigQueryIO to write tables with CREATE_NEVER disposition
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc369522
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc369522
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc369522
Branch: refs/heads/master
Commit: dc369522d1cfa46ae9058919d93229de05db2b6a
Parents: 11c3cd7
Author: Sam McVeety <sg...@google.com>
Authored: Mon Dec 12 18:47:20 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 24 14:41:39 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 51 ++++++++++++++------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 36 ++++++++++++++
2 files changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dc369522/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index aff199a..fa49f55 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1925,10 +1925,17 @@ public class BigQueryIO {
if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
// We will use BigQuery's streaming write API -- validate supported dispositions.
- checkArgument(
- createDisposition != CreateDisposition.CREATE_NEVER,
- "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when"
- + " using a tablespec function.");
+ if (tableRefFunction != null) {
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_NEVER,
+ "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
+ + " function.");
+ }
+ if (jsonSchema == null) {
+ checkArgument(
+ createDisposition == CreateDisposition.CREATE_NEVER,
+ "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
+ }
checkArgument(
writeDisposition != WriteDisposition.WRITE_TRUNCATE,
@@ -1965,7 +1972,9 @@ public class BigQueryIO {
if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
return input.apply(
new StreamWithDeDup(getTable(), tableRefFunction,
- NestedValueProvider.of(jsonSchema, new JsonSchemaToTableSchema()), bqServices));
+ jsonSchema == null ? null : NestedValueProvider.of(
+ jsonSchema, new JsonSchemaToTableSchema()),
+ createDisposition, bqServices));
}
ValueProvider<TableReference> table = getTableWithDefaultProject(options);
@@ -2608,16 +2617,19 @@ public class BigQueryIO {
* Implementation of DoFn to perform streaming BigQuery write.
*/
@SystemDoFnInternal
- private static class StreamingWriteFn
+ @VisibleForTesting
+ static class StreamingWriteFn
extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
/** TableSchema in JSON. Use String to make the class Serializable. */
- private final ValueProvider<String> jsonTableSchema;
+ @Nullable private final ValueProvider<String> jsonTableSchema;
private final BigQueryServices bqServices;
/** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
private transient Map<String, List<TableRow>> tableRows;
+ private final Write.CreateDisposition createDisposition;
+
/** The list of unique ids for each BigQuery table row. */
private transient Map<String, List<String>> uniqueIdsForTableRows;
@@ -2631,9 +2643,12 @@ public class BigQueryIO {
createAggregator("ByteCount", Sum.ofLongs());
/** Constructor. */
- StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) {
- this.jsonTableSchema =
+ StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
+ Write.CreateDisposition createDisposition,
+ BigQueryServices bqServices) {
+ this.jsonTableSchema = schema == null ? null :
NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
+ this.createDisposition = createDisposition;
this.bqServices = checkNotNull(bqServices, "bqServices");
}
@@ -2689,7 +2704,8 @@ public class BigQueryIO {
public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws InterruptedException, IOException {
TableReference tableReference = parseTableSpec(tableSpec);
- if (!createdTables.contains(tableSpec)) {
+ if (createDisposition != createDisposition.CREATE_NEVER
+ && !createdTables.contains(tableSpec)) {
synchronized (createdTables) {
// Another thread may have succeeded in creating the table in the meanwhile, so
// check again. This check isn't needed for correctness, but we add it to prevent
@@ -2945,19 +2961,22 @@ public class BigQueryIO {
* it leverages BigQuery best effort de-dup mechanism.
*/
private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> {
- private final transient ValueProvider<TableReference> tableReference;
- private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
- private final transient ValueProvider<TableSchema> tableSchema;
+ @Nullable private final transient ValueProvider<TableReference> tableReference;
+ @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
+ @Nullable private final transient ValueProvider<TableSchema> tableSchema;
+ private final Write.CreateDisposition createDisposition;
private final BigQueryServices bqServices;
/** Constructor. */
StreamWithDeDup(ValueProvider<TableReference> tableReference,
- SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- ValueProvider<TableSchema> tableSchema,
+ @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
+ @Nullable ValueProvider<TableSchema> tableSchema,
+ Write.CreateDisposition createDisposition,
BigQueryServices bqServices) {
this.tableReference = tableReference;
this.tableRefFunction = tableRefFunction;
this.tableSchema = tableSchema;
+ this.createDisposition = createDisposition;
this.bqServices = checkNotNull(bqServices, "bqServices");
}
@@ -2989,7 +3008,7 @@ public class BigQueryIO {
tagged
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
.apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
- .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices)));
+ .apply(ParDo.of(new StreamingWriteFn(tableSchema, createDisposition, bqServices)));
// Note that the implementation to return PDone here breaks the
// implicit assumption about the job execution order. If a user
http://git-wip-us.apache.org/repos/asf/beam/blob/dc369522/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 3e8c2c9..ba7f44e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1523,6 +1523,42 @@ public class BigQueryIOTest implements Serializable {
}
@Test
+ public void testStreamingWriteFnCreateNever() throws Exception {
+ BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn(
+ null, CreateDisposition.CREATE_NEVER, new FakeBigQueryServices());
+ assertEquals(BigQueryIO.parseTableSpec("dataset.table"),
+ fn.getOrCreateTable(null, "dataset.table"));
+ }
+
+ @Test
+ public void testCreateNeverWithStreaming() throws Exception {
+ BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+ options.setProject("project");
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+
+ TableReference tableRef = new TableReference();
+ tableRef.setDatasetId("dataset");
+ tableRef.setTableId("sometable");
+
+ PCollection<TableRow> tableRows =
+ p.apply(CountingInput.unbounded())
+ .apply(
+ MapElements.via(
+ new SimpleFunction<Long, TableRow>() {
+ @Override
+ public TableRow apply(Long input) {
+ return null;
+ }
+ }))
+ .setCoder(TableRowJsonCoder.of());
+ tableRows
+ .apply(BigQueryIO.Write.to(tableRef)
+ .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+ .withoutValidation());
+ }
+
+ @Test
public void testTableParsing() {
TableReference ref = BigQueryIO
.parseTableSpec("my-project:data_set.table_name");
[2/2] beam git commit: This closes #1590
Posted by dh...@apache.org.
This closes #1590
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f2389ab7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f2389ab7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f2389ab7
Branch: refs/heads/master
Commit: f2389ab7ba1d562d23420d7e2ecd638524439dc6
Parents: 11c3cd7 dc36952
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jan 24 14:41:55 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 24 14:41:55 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 51 ++++++++++++++------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 36 ++++++++++++++
2 files changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------