You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/12 19:14:48 UTC
[1/2] incubator-beam git commit: BigQueryIO.Write: support runtime
schema and table
Repository: incubator-beam
Updated Branches:
refs/heads/master 437393712 -> 321547fb1
BigQueryIO.Write: support runtime schema and table
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd6d09c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd6d09c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd6d09c3
Branch: refs/heads/master
Commit: fd6d09c32f6bcf67c63ec74548373ee90d67f2bd
Parents: 4373937
Author: Sam McVeety <sg...@google.com>
Authored: Sun Dec 4 14:16:23 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Dec 12 11:14:20 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 217 +++++++++++++------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 60 ++++-
2 files changed, 206 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd6d09c3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f99ca78..0be8567 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -321,6 +321,23 @@ public class BigQueryIO {
return sb.toString();
}
+ @VisibleForTesting
+ static class JsonSchemaToTableSchema
+ implements SerializableFunction<String, TableSchema> {
+ @Override
+ public TableSchema apply(String from) {
+ return fromJsonString(from, TableSchema.class);
+ }
+ }
+
+ private static class TableSchemaToJsonSchema
+ implements SerializableFunction<TableSchema, String> {
+ @Override
+ public String apply(TableSchema from) {
+ return toJsonString(from);
+ }
+ }
+
private static class JsonTableRefToTableRef
implements SerializableFunction<String, TableReference> {
@Override
@@ -329,6 +346,14 @@ public class BigQueryIO {
}
}
+ private static class TableRefToTableSpec
+ implements SerializableFunction<TableReference, String> {
+ @Override
+ public String apply(TableReference from) {
+ return toTableSpec(from);
+ }
+ }
+
private static class TableRefToJson
implements SerializableFunction<TableReference, String> {
@Override
@@ -353,6 +378,15 @@ public class BigQueryIO {
}
}
+ @Nullable
+ private static ValueProvider<String> displayTable(
+ @Nullable ValueProvider<TableReference> table) {
+ if (table == null) {
+ return null;
+ }
+ return NestedValueProvider.of(table, new TableRefToTableSpec());
+ }
+
/**
* A {@link PTransform} that reads from a BigQuery table and returns a
* {@link PCollection} of {@link TableRow TableRows} containing each of the rows of the table.
@@ -659,11 +693,11 @@ public class BigQueryIO {
.setProjectId(executingProject)
.setDatasetId(queryTempDatasetId)
.setTableId(queryTempTableId);
+ String jsonTableRef = toJsonString(queryTempTableRef);
source = BigQueryQuerySource.create(
jobIdToken, query, NestedValueProvider.of(
- StaticValueProvider.of(
- toJsonString(queryTempTableRef)), new JsonTableRefToTableRef()),
+ StaticValueProvider.of(jsonTableRef), new JsonTableRefToTableRef()),
flattenResults, useLegacySql, extractDestinationDir, bqServices);
} else {
ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
@@ -712,17 +746,10 @@ public class BigQueryIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- TableReference table = getTable();
-
- if (table != null) {
- builder.add(DisplayData.item("table", toTableSpec(table))
- .withLabel("Table"));
- }
- String queryString = query == null
- ? null : query.isAccessible()
- ? query.get() : query.toString();
builder
- .addIfNotNull(DisplayData.item("query", queryString)
+ .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
+ .withLabel("Table"))
+ .addIfNotNull(DisplayData.item("query", query)
.withLabel("Query"))
.addIfNotNull(DisplayData.item("flattenResults", flattenResults)
.withLabel("Flatten Query Results"))
@@ -752,10 +779,10 @@ public class BigQueryIO {
if (Strings.isNullOrEmpty(table.get().getProjectId())) {
// If user does not specify a project we assume the table to be located in
// the default project.
- TableReference ref = table.get();
- ref.setProjectId(bqOptions.getProject());
+ TableReference tableRef = table.get();
+ tableRef.setProjectId(bqOptions.getProject());
return NestedValueProvider.of(StaticValueProvider.of(
- toJsonString(ref)), new JsonTableRefToTableRef());
+ toJsonString(tableRef)), new JsonTableRefToTableRef());
}
return table;
}
@@ -941,8 +968,7 @@ public class BigQueryIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String table = jsonTable.isAccessible() ? jsonTable.get() : jsonTable.toString();
- builder.add(DisplayData.item("table", table));
+ builder.add(DisplayData.item("table", jsonTable));
}
}
@@ -1060,7 +1086,7 @@ public class BigQueryIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("query", query.get()));
+ builder.add(DisplayData.item("query", query));
}
private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
@@ -1516,6 +1542,11 @@ public class BigQueryIO {
}
/** Creates a write transformation for the given table. */
+ public static Bound to(ValueProvider<String> tableSpec) {
+ return new Bound().to(tableSpec);
+ }
+
+ /** Creates a write transformation for the given table. */
public static Bound to(TableReference table) {
return new Bound().to(table);
}
@@ -1558,6 +1589,13 @@ public class BigQueryIO {
return new Bound().withSchema(schema);
}
+ /**
+ * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+ */
+ public static Bound withSchema(ValueProvider<TableSchema> schema) {
+ return new Bound().withSchema(schema);
+ }
+
/** Creates a write transformation with the specified options for creating the table. */
public static Bound withCreateDisposition(CreateDisposition disposition) {
return new Bound().withCreateDisposition(disposition);
@@ -1593,12 +1631,12 @@ public class BigQueryIO {
// It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
- @Nullable final String jsonTableRef;
+ @Nullable final ValueProvider<String> jsonTableRef;
@Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
// Table schema. The schema is required only if the table does not exist.
- @Nullable final String jsonSchema;
+ @Nullable final ValueProvider<String> jsonSchema;
// Options for creating the table. Valid values are CREATE_IF_NEEDED and
// CREATE_NEVER.
@@ -1645,9 +1683,9 @@ public class BigQueryIO {
null /* bigQueryServices */);
}
- private Bound(String name, @Nullable String jsonTableRef,
+ private Bound(String name, @Nullable ValueProvider<String> jsonTableRef,
@Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- @Nullable String jsonSchema,
+ @Nullable ValueProvider<String> jsonSchema,
CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate,
@Nullable BigQueryServices bigQueryServices) {
super(name);
@@ -1667,7 +1705,8 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Bound to(String tableSpec) {
- return to(parseTableSpec(tableSpec));
+ return toTableRef(NestedValueProvider.of(
+ StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
}
/**
@@ -1676,7 +1715,28 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Bound to(TableReference table) {
- return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition,
+ return to(StaticValueProvider.of(toTableSpec(table)));
+ }
+
+ /**
+ * Returns a copy of this write transformation, but writing to the specified table. Refer to
+ * {@link #parseTableSpec(String)} for the specification format.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound to(ValueProvider<String> tableSpec) {
+ return toTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
+ }
+
+ /**
+ * Returns a copy of this write transformation, but writing to the specified table.
+ *
+ * <p>Does not modify this object.
+ */
+ private Bound toTableRef(ValueProvider<TableReference> table) {
+ return new Bound(name,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ tableRefFunction, jsonSchema, createDisposition,
writeDisposition, validate, bigQueryServices);
}
@@ -1716,7 +1776,17 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Bound withSchema(TableSchema schema) {
- return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema),
+ return new Bound(name, jsonTableRef, tableRefFunction,
+ StaticValueProvider.of(toJsonString(schema)),
+ createDisposition, writeDisposition, validate, bigQueryServices);
+ }
+
+ /**
+ * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+ */
+ public Bound withSchema(ValueProvider<TableSchema> schema) {
+ return new Bound(name, jsonTableRef, tableRefFunction,
+ NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
createDisposition, writeDisposition, validate, bigQueryServices);
}
@@ -1798,7 +1868,7 @@ public class BigQueryIO {
// The user specified a table.
if (jsonTableRef != null && validate) {
- TableReference table = getTableWithDefaultProject(options);
+ TableReference table = getTableWithDefaultProject(options).get();
DatasetService datasetService = getBigQueryServices().getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
@@ -1855,10 +1925,11 @@ public class BigQueryIO {
// StreamWithDeDup and BigQuery's streaming import API.
if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
return input.apply(
- new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices));
+ new StreamWithDeDup(getTable(), tableRefFunction,
+ NestedValueProvider.of(jsonSchema, new JsonSchemaToTableSchema()), bqServices));
}
- TableReference table = getTableWithDefaultProject(options);
+ ValueProvider<TableReference> table = getTableWithDefaultProject(options);
String jobIdToken = "beam_job_" + randomUUIDString();
String tempLocation = options.getTempLocation();
@@ -1909,7 +1980,7 @@ public class BigQueryIO {
bqServices,
jobIdToken,
tempFilePrefix,
- toJsonString(table),
+ NestedValueProvider.of(table, new TableRefToJson()),
jsonSchema,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED)));
@@ -1920,7 +1991,7 @@ public class BigQueryIO {
.of(new WriteRename(
bqServices,
jobIdToken,
- toJsonString(table),
+ NestedValueProvider.of(table, new TableRefToJson()),
writeDisposition,
createDisposition,
tempTablesView))
@@ -1934,7 +2005,7 @@ public class BigQueryIO {
bqServices,
jobIdToken,
tempFilePrefix,
- toJsonString(table),
+ NestedValueProvider.of(table, new TableRefToJson()),
jsonSchema,
writeDisposition,
createDisposition)));
@@ -2031,7 +2102,8 @@ public class BigQueryIO {
/** Returns the table schema. */
public TableSchema getSchema() {
- return fromJsonString(jsonSchema, TableSchema.class);
+ return fromJsonString(
+ jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
}
/**
@@ -2039,20 +2111,32 @@ public class BigQueryIO {
*
* <p>If the table's project is not specified, use the executing project.
*/
- @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
- TableReference table = getTable();
- if (table != null && Strings.isNullOrEmpty(table.getProjectId())) {
+ @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+ BigQueryOptions bqOptions) {
+ ValueProvider<TableReference> table = getTable();
+ if (table == null) {
+ return table;
+ }
+ if (!table.isAccessible()) {
+ LOG.info("Using a dynamic value for table input. This must contain a project"
+ + " in the table reference: {}", table);
+ return table;
+ }
+ if (Strings.isNullOrEmpty(table.get().getProjectId())) {
// If user does not specify a project we assume the table to be located in
// the default project.
- table.setProjectId(bqOptions.getProject());
+ TableReference tableRef = table.get();
+ tableRef.setProjectId(bqOptions.getProject());
+ return NestedValueProvider.of(StaticValueProvider.of(
+ toJsonString(tableRef)), new JsonTableRefToTableRef());
}
return table;
}
/** Returns the table reference, or {@code null}. */
@Nullable
- public TableReference getTable() {
- return fromJsonString(jsonTableRef, TableReference.class);
+ public ValueProvider<TableReference> getTable() {
+ return NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
}
/** Returns {@code true} if table validation is enabled. */
@@ -2172,8 +2256,8 @@ public class BigQueryIO {
private final BigQueryServices bqServices;
private final String jobIdToken;
private final String tempFilePrefix;
- private final String jsonTableRef;
- private final String jsonSchema;
+ private final ValueProvider<String> jsonTableRef;
+ private final ValueProvider<String> jsonSchema;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
@@ -2182,8 +2266,8 @@ public class BigQueryIO {
BigQueryServices bqServices,
String jobIdToken,
String tempFilePrefix,
- String jsonTableRef,
- String jsonSchema,
+ ValueProvider<String> jsonTableRef,
+ ValueProvider<String> jsonSchema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition) {
this.singlePartition = singlePartition;
@@ -2200,7 +2284,7 @@ public class BigQueryIO {
public void processElement(ProcessContext c) throws Exception {
List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
- TableReference ref = fromJsonString(jsonTableRef, TableReference.class);
+ TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class);
if (!singlePartition) {
ref.setTableId(jobIdPrefix);
}
@@ -2209,7 +2293,8 @@ public class BigQueryIO {
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
jobIdPrefix,
ref,
- fromJsonString(jsonSchema, TableSchema.class),
+ fromJsonString(
+ jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
partition,
writeDisposition,
createDisposition);
@@ -2242,16 +2327,15 @@ public class BigQueryIO {
.setProjectId(projectId)
.setJobId(jobId);
jobService.startLoadJob(jobRef, loadConfig);
- Job job = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
- Status jobStatus = parseStatus(job);
+ Status jobStatus =
+ parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
switch (jobStatus) {
case SUCCEEDED:
return;
case UNKNOWN:
throw new RuntimeException("Failed to poll the load job status of job " + jobId);
case FAILED:
- LOG.info("BigQuery load job failed. Status: {} Details: {}",
- jobId, job.getStatus());
+ LOG.info("BigQuery load job failed: {}", jobId);
continue;
default:
throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
@@ -2306,7 +2390,7 @@ public class BigQueryIO {
static class WriteRename extends DoFn<String, Void> {
private final BigQueryServices bqServices;
private final String jobIdToken;
- private final String jsonTableRef;
+ private final ValueProvider<String> jsonTableRef;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final PCollectionView<Iterable<String>> tempTablesView;
@@ -2314,7 +2398,7 @@ public class BigQueryIO {
public WriteRename(
BigQueryServices bqServices,
String jobIdToken,
- String jsonTableRef,
+ ValueProvider<String> jsonTableRef,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
PCollectionView<Iterable<String>> tempTablesView) {
@@ -2342,7 +2426,7 @@ public class BigQueryIO {
copy(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
jobIdToken,
- fromJsonString(jsonTableRef, TableReference.class),
+ fromJsonString(jsonTableRef.get(), TableReference.class),
tempTables,
writeDisposition,
createDisposition);
@@ -2475,7 +2559,7 @@ public class BigQueryIO {
private static class StreamingWriteFn
extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
/** TableSchema in JSON. Use String to make the class Serializable. */
- private final String jsonTableSchema;
+ private final ValueProvider<String> jsonTableSchema;
private final BigQueryServices bqServices;
@@ -2495,8 +2579,9 @@ public class BigQueryIO {
createAggregator("ByteCount", new Sum.SumLongFn());
/** Constructor. */
- StreamingWriteFn(TableSchema schema, BigQueryServices bqServices) {
- this.jsonTableSchema = toJsonString(schema);
+ StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) {
+ this.jsonTableSchema =
+ NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
this.bqServices = checkNotNull(bqServices, "bqServices");
}
@@ -2549,7 +2634,8 @@ public class BigQueryIO {
// check again. This check isn't needed for correctness, but we add it to prevent
// every thread from attempting a create and overwhelming our BigQuery quota.
if (!createdTables.contains(tableSpec)) {
- TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
+ TableSchema tableSchema = JSON_FACTORY.fromString(
+ jsonTableSchema.get(), TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND,
@@ -2708,7 +2794,7 @@ public class BigQueryIO {
private static class TagWithUniqueIdsAndTable
extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
/** TableSpec to write to. */
- private final String tableSpec;
+ private final ValueProvider<String> tableSpec;
/** User function mapping windows to {@link TableReference} in JSON. */
private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
@@ -2716,15 +2802,16 @@ public class BigQueryIO {
private transient String randomUUID;
private transient long sequenceNo = 0L;
- TagWithUniqueIdsAndTable(BigQueryOptions options, TableReference table,
+ TagWithUniqueIdsAndTable(BigQueryOptions options,
+ ValueProvider<TableReference> table,
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
checkArgument(table == null ^ tableRefFunction == null,
"Exactly one of table or tableRefFunction should be set");
if (table != null) {
- if (table.getProjectId() == null) {
- table.setProjectId(options.as(BigQueryOptions.class).getProject());
+ if (table.isAccessible() && table.get().getProjectId() == null) {
+ table.get().setProjectId(options.as(BigQueryOptions.class).getProject());
}
- this.tableSpec = toTableSpec(table);
+ this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
} else {
tableSpec = null;
}
@@ -2763,7 +2850,7 @@ public class BigQueryIO {
private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
if (tableSpec != null) {
- return tableSpec;
+ return tableSpec.get();
} else {
TableReference table = tableRefFunction.apply(window);
if (table.getProjectId() == null) {
@@ -2781,15 +2868,15 @@ public class BigQueryIO {
* it leverages BigQuery best effort de-dup mechanism.
*/
private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> {
- private final transient TableReference tableReference;
+ private final transient ValueProvider<TableReference> tableReference;
private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
- private final transient TableSchema tableSchema;
+ private final transient ValueProvider<TableSchema> tableSchema;
private final BigQueryServices bqServices;
/** Constructor. */
- StreamWithDeDup(TableReference tableReference,
+ StreamWithDeDup(ValueProvider<TableReference> tableReference,
SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- TableSchema tableSchema,
+ ValueProvider<TableSchema> tableSchema,
BigQueryServices bqServices) {
this.tableReference = tableReference;
this.tableRefFunction = tableRefFunction;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd6d09c3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 25caf63..54ec2bb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -95,6 +95,7 @@ import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.JsonSchemaToTableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
@@ -111,6 +112,8 @@ import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.NeedsRunner;
@@ -643,9 +646,9 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.Write.Bound bound, String project, String dataset, String table,
TableSchema schema, CreateDisposition createDisposition,
WriteDisposition writeDisposition, boolean validate) {
- assertEquals(project, bound.getTable().getProjectId());
- assertEquals(dataset, bound.getTable().getDatasetId());
- assertEquals(table, bound.getTable().getTableId());
+ assertEquals(project, bound.getTable().get().getProjectId());
+ assertEquals(dataset, bound.getTable().get().getDatasetId());
+ assertEquals(table, bound.getTable().get().getTableId());
assertEquals(schema, bound.getSchema());
assertEquals(createDisposition, bound.createDisposition);
assertEquals(writeDisposition, bound.writeDisposition);
@@ -1845,8 +1848,8 @@ public class BigQueryIOTest implements Serializable {
fakeBqServices,
jobIdToken,
tempFilePrefix,
- jsonTable,
- jsonSchema,
+ StaticValueProvider.of(jsonTable),
+ StaticValueProvider.of(jsonSchema),
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED);
@@ -1920,7 +1923,7 @@ public class BigQueryIOTest implements Serializable {
WriteRename writeRename = new WriteRename(
fakeBqServices,
jobIdToken,
- jsonTable,
+ StaticValueProvider.of(jsonTable),
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
tempTablesView);
@@ -1961,6 +1964,51 @@ public class BigQueryIOTest implements Serializable {
logged.verifyNotLogged("Failed to delete the table " + toJsonString(tableRefs.get(2)));
}
+ /** Test options. **/
+ public interface RuntimeTestOptions extends PipelineOptions {
+ ValueProvider<String> getInputTable();
+ void setInputTable(ValueProvider<String> value);
+
+ ValueProvider<String> getInputQuery();
+ void setInputQuery(ValueProvider<String> value);
+
+ ValueProvider<String> getOutputTable();
+ void setOutputTable(ValueProvider<String> value);
+
+ ValueProvider<String> getOutputSchema();
+ void setOutputSchema(ValueProvider<String> value);
+ }
+
+ @Test
+ public void testRuntimeOptionsNotCalledInApplyInputTable() {
+ RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ bqOptions.setTempLocation("gs://testbucket/testdir");
+ Pipeline pipeline = TestPipeline.create(options);
+ pipeline
+ .apply(BigQueryIO.Read.from(options.getInputTable()).withoutValidation())
+ .apply(BigQueryIO.Write
+ .to(options.getOutputTable())
+ .withSchema(NestedValueProvider.of(
+ options.getOutputSchema(), new JsonSchemaToTableSchema()))
+ .withoutValidation());
+ }
+
+ @Test
+ public void testRuntimeOptionsNotCalledInApplyInputQuery() {
+ RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ bqOptions.setTempLocation("gs://testbucket/testdir");
+ Pipeline pipeline = TestPipeline.create(options);
+ pipeline
+ .apply(BigQueryIO.Read.fromQuery(options.getInputQuery()).withoutValidation())
+ .apply(BigQueryIO.Write
+ .to(options.getOutputTable())
+ .withSchema(NestedValueProvider.of(
+ options.getOutputSchema(), new JsonSchemaToTableSchema()))
+ .withoutValidation());
+ }
+
private static void testNumFiles(File tempDir, int expectedNumFiles) {
assertEquals(expectedNumFiles, tempDir.listFiles(new FileFilter() {
@Override
[2/2] incubator-beam git commit: Closes #1513
Posted by dh...@apache.org.
Closes #1513
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/321547fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/321547fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/321547fb
Branch: refs/heads/master
Commit: 321547fb15c358fcd196954779548f6644aa3c08
Parents: 4373937 fd6d09c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Dec 12 11:14:41 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Dec 12 11:14:41 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 217 +++++++++++++------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 60 ++++-
2 files changed, 206 insertions(+), 71 deletions(-)
----------------------------------------------------------------------