You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:45 UTC
[11/50] [abbrv] beam git commit: Use tableRefFunction throughout
BigQueryIO. Constant table writes use ConstantTableSpecFunction.
Use tableRefFunction throughout BigQueryIO. Constant table writes use ConstantTableSpecFunction.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c939a436
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c939a436
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c939a436
Branch: refs/heads/DSL_SQL
Commit: c939a43617cdb37228625a34b3545377b142fc8a
Parents: e0df7d8
Author: Reuven Lax <re...@google.com>
Authored: Tue Mar 28 11:21:59 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:49 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 57 ++++++++++----------
.../sdk/io/gcp/bigquery/StreamWithDeDup.java | 4 +-
.../gcp/bigquery/TagWithUniqueIdsAndTable.java | 57 ++++++--------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 19 ++-----
4 files changed, 50 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 9753da5..af0d561 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -700,7 +700,8 @@ public class BigQueryIO {
abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);
abstract Builder<T> setTableRefFunction(
SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction);
- abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);
+ abstract Builder<T> setFormatFunction(
+ SerializableFunction<T, TableRow> formatFunction);
abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition);
abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
@@ -781,7 +782,8 @@ public class BigQueryIO {
/** Ensures that methods of the to() family are called at most once. */
private void ensureToNotCalledYet() {
checkState(
- getJsonTableRef() == null && getTable() == null, "to() already called");
+ getJsonTableRef() == null && getTable() == null
+ && getTableRefFunction() == null, "to() already called");
}
/**
@@ -805,6 +807,8 @@ public class BigQueryIO {
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
new TableRefToJson()))
+ .setTableRefFunction(new TranslateTableSpecFunction<T>(
+ new ConstantTableSpecFunction<T>(tableSpec)))
.build();
}
@@ -812,7 +816,8 @@ public class BigQueryIO {
* Writes to table specified by the specified table function. The table is a function of
* {@link ValueInSingleWindow}, so can be determined by the value or by the window.
*/
- public Write<T> to(SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
+ public Write<T> to(
+ SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
return toTableReference(new TranslateTableSpecFunction<T>(tableSpecFunction));
}
@@ -848,6 +853,20 @@ public class BigQueryIO {
}
}
+ static class ConstantTableSpecFunction<T> implements
+ SerializableFunction<ValueInSingleWindow<T>, String> {
+ private ValueProvider<String> tableSpec;
+
+ ConstantTableSpecFunction(ValueProvider<String> tableSpec) {
+ this.tableSpec = tableSpec;
+ }
+
+ @Override
+ public String apply(ValueInSingleWindow<T> value) {
+ return tableSpec.get();
+ }
+ }
+
/**
* Uses the specified schema for rows to be written.
*
@@ -900,13 +919,8 @@ public class BigQueryIO {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
// Exactly one of the table and table reference can be configured.
- checkState(
- getJsonTableRef() != null || getTableRefFunction() != null,
+ checkState(getTableRefFunction() != null,
"must set the table reference of a BigQueryIO.Write transform");
- checkState(
- getJsonTableRef() == null || getTableRefFunction() == null,
- "Cannot set both a table reference and a table function for a BigQueryIO.Write"
- + " transform");
checkArgument(getFormatFunction() != null,
"A function must be provided to convert type into a TableRow. "
@@ -920,6 +934,7 @@ public class BigQueryIO {
// The user specified a table.
if (getJsonTableRef() != null && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
+ // TODO: This seems wrong - what if the ValueProvider is not accessible?
DatasetService datasetService = getBigQueryServices().getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
@@ -935,24 +950,12 @@ public class BigQueryIO {
}
}
- if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || getTableRefFunction() != null) {
+ if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
// We will use BigQuery's streaming write API -- validate supported dispositions.
- if (getTableRefFunction() != null) {
- checkArgument(
- getCreateDisposition() != CreateDisposition.CREATE_NEVER,
- "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
- + " function.");
- }
- if (getJsonSchema() == null) {
- checkArgument(
- getCreateDisposition() == CreateDisposition.CREATE_NEVER,
- "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
- }
-
checkArgument(
getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
- "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
- + " when using a tablespec function.");
+ "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
+ + " PCollection.");
} else {
// We will use a BigQuery load job -- validate the temp location.
String tempLocation = options.getTempLocation();
@@ -977,7 +980,7 @@ public class BigQueryIO {
public WriteResult expand(PCollection<T> input) {
// When writing an Unbounded PCollection, or when a tablespec function is defined, we use
// StreamWithDeDup and BigQuery's streaming import API.
- if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
+ if (input.isBounded() == IsBounded.UNBOUNDED) {
return input.apply(new StreamWithDeDup<T>(this));
} else {
return input.apply(new BatchLoadBigQuery<T>(this));
@@ -1026,12 +1029,12 @@ public class BigQueryIO {
*
* <p>If the table's project is not specified, use the executing project.
*/
- @Nullable ValueProvider<TableReference> getTableWithDefaultProject(
- BigQueryOptions bqOptions) {
+ @Nullable ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
ValueProvider<TableReference> table = getTable();
if (table == null) {
return table;
}
+
if (!table.isAccessible()) {
LOG.info("Using a dynamic value for table input. This must contain a project"
+ " in the table reference: {}", table);
http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
index 1fa26d1..506a564 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
@@ -64,8 +64,7 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> {
PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>(
- input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(),
- write.getTableRefFunction(), write.getFormatFunction())));
+ input.getPipeline().getOptions().as(BigQueryOptions.class), write)));
// To prevent having the same TableRow processed more than once with regenerated
// different unique ids, this implementation relies on "checkpointing", which is
@@ -85,6 +84,7 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> {
write.getCreateDisposition(),
write.getTableDescription(),
write.getBigQueryServices())));
+
return WriteResult.in(input.getPipeline());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
index a6608e4..8d7d1e6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
@@ -18,23 +18,18 @@
package org.apache.beam.sdk.io.gcp.bigquery;
-import static com.google.common.base.Preconditions.checkArgument;
-
import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
@@ -49,39 +44,22 @@ import org.apache.beam.sdk.values.ValueInSingleWindow;
@VisibleForTesting
class TagWithUniqueIdsAndTable<T>
extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> {
- /** TableSpec to write to. */
- private final ValueProvider<String> tableSpec;
-
- /** User function mapping windowed values to {@link TableReference} in JSON. */
- private final SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction;
+ /** TableSpec to write to in the case of a single static destination. */
+ private ValueProvider<String> tableSpec = null;
- /** User function mapping user type to a TableRow. */
- private final SerializableFunction<T, TableRow> formatFunction;
+ private final Write<T, ?> write;
private transient String randomUUID;
private transient long sequenceNo = 0L;
TagWithUniqueIdsAndTable(BigQueryOptions options,
- ValueProvider<TableReference> table,
- SerializableFunction<ValueInSingleWindow<T>, TableReference>
- tableRefFunction,
- SerializableFunction<T, TableRow> formatFunction) {
- checkArgument(table == null ^ tableRefFunction == null,
- "Exactly one of table or tableRefFunction should be set");
+ Write<T, ?> write) {
+ ValueProvider<TableReference> table = write.getTableWithDefaultProject(
+ options.as(BigQueryOptions.class));
if (table != null) {
- if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
- TableReference tableRef = table.get()
- .setProjectId(options.as(BigQueryOptions.class).getProject());
- table = NestedValueProvider.of(
- StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
- new JsonTableRefToTableRef());
- }
this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
- } else {
- tableSpec = null;
}
- this.tableRefFunction = tableRefFunction;
- this.formatFunction = formatFunction;
+ this.write = write;
}
@@ -101,7 +79,7 @@ class TagWithUniqueIdsAndTable<T>
// We output on keys 0-50 to ensure that there's enough batching for
// BigQuery.
context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
- new TableRowInfo(formatFunction.apply(context.element()), uniqueId)));
+ new TableRowInfo(write.getFormatFunction().apply(context.element()), uniqueId)));
}
@Override
@@ -109,10 +87,8 @@ class TagWithUniqueIdsAndTable<T>
super.populateDisplayData(builder);
builder.addIfNotNull(DisplayData.item("table", tableSpec));
- if (tableRefFunction != null) {
- builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+ builder.add(DisplayData.item("tableFn", write.getTableRefFunction().getClass())
.withLabel("Table Reference Function"));
- }
}
@VisibleForTesting
@@ -120,16 +96,13 @@ class TagWithUniqueIdsAndTable<T>
return tableSpec;
}
+
private String tableSpecFromWindowedValue(BigQueryOptions options,
ValueInSingleWindow<T> value) {
- if (tableSpec != null) {
- return tableSpec.get();
- } else {
- TableReference table = tableRefFunction.apply(value);
- if (table.getProjectId() == null) {
- table.setProjectId(options.getProject());
- }
- return BigQueryHelpers.toTableSpec(table);
+ TableReference table = write.getTableRefFunction().apply(value);
+ if (Strings.isNullOrEmpty(table.getProjectId())) {
+ table.setProjectId(options.getProject());
}
+ return BigQueryHelpers.toTableSpec(table);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 83fd8d9..499aa74 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,7 +26,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -103,7 +102,6 @@ import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -150,6 +148,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -1375,7 +1374,8 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteDefaultProject() {
- BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows().to("somedataset.sometable");
+ BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows()
+ .to("somedataset" + ".sometable");
checkWriteObject(
write, null, "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY,
@@ -2350,19 +2350,6 @@ public class BigQueryIOTest implements Serializable {
DisplayData.from(write);
}
- @Test
- public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() {
- BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
- bqOptions.setProject("project");
- TagWithUniqueIdsAndTable<TableRow> tag =
- new TagWithUniqueIdsAndTable<TableRow>(
- bqOptions, NestedValueProvider.of(
- StaticValueProvider.of("data_set.table_name"),
- new TableSpecToTableRef()), null, null);
- TableReference table = BigQueryHelpers.parseTableSpec(tag.getTableSpec().get());
- assertNotNull(table.getProjectId());
- }
-
private static void testNumFiles(File tempDir, int expectedNumFiles) {
assertEquals(expectedNumFiles, tempDir.listFiles(new FileFilter() {
@Override