You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:27 UTC
[19/50] incubator-beam git commit: BigQueryIO: fix streaming write,
typo in API
BigQueryIO: fix streaming write, typo in API
and improve testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5fb4f5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5fb4f5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5fb4f5de
Branch: refs/heads/gearpump-runner
Commit: 5fb4f5de9515db717818f1e3ffd7ca3c6eba5614
Parents: 4206408
Author: Sam McVeety <sg...@google.com>
Authored: Fri Dec 16 18:10:28 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Dec 16 23:53:49 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 23 +++++--
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 72 ++++++++++++--------
2 files changed, 63 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fb4f5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 28049ed..7bb1e51 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -370,7 +370,8 @@ public class BigQueryIO {
}
}
- private static class TableSpecToTableRef
+ @VisibleForTesting
+ static class TableSpecToTableRef
implements SerializableFunction<String, TableReference> {
@Override
public TableReference apply(String from) {
@@ -807,6 +808,7 @@ public class BigQueryIO {
/**
* Returns the query to be read, or {@code null} if reading from a table instead.
*/
+ @Nullable
public String getQuery() {
return query == null ? null : query.get();
}
@@ -814,7 +816,8 @@ public class BigQueryIO {
/**
* Returns the query to be read, or {@code null} if reading from a table instead.
*/
- public ValueProvider<String> getQueryProivder() {
+ @Nullable
+ public ValueProvider<String> getQueryProvider() {
return query;
}
@@ -2813,7 +2816,8 @@ public class BigQueryIO {
* a randomUUID is generated only once per bucket of data. The actual unique
* id is created by concatenating this randomUUID with a sequential number.
*/
- private static class TagWithUniqueIdsAndTable
+ @VisibleForTesting
+ static class TagWithUniqueIdsAndTable
extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
/** TableSpec to write to. */
private final ValueProvider<String> tableSpec;
@@ -2830,8 +2834,12 @@ public class BigQueryIO {
checkArgument(table == null ^ tableRefFunction == null,
"Exactly one of table or tableRefFunction should be set");
if (table != null) {
- if (table.isAccessible() && table.get().getProjectId() == null) {
- table.get().setProjectId(options.as(BigQueryOptions.class).getProject());
+ if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
+ TableReference tableRef = table.get()
+ .setProjectId(options.as(BigQueryOptions.class).getProject());
+ table = NestedValueProvider.of(
+ StaticValueProvider.of(toJsonString(tableRef)),
+ new JsonTableRefToTableRef());
}
this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
} else {
@@ -2870,6 +2878,11 @@ public class BigQueryIO {
}
}
+ @VisibleForTesting
+ ValueProvider<String> getTableSpec() {
+ return tableSpec;
+ }
+
private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
if (tableSpec != null) {
return tableSpec.get();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fb4f5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index b78316f..dc566d2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,6 +26,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -2242,43 +2243,60 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- public void testRuntimeOptionsNotCalledInApplyInputTable() throws IOException {
+ public void testRuntimeOptionsNotCalledInApplyInputTable() {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
- FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
- .withJobService(new FakeJobService());
+ bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- pipeline
- .apply(BigQueryIO.Read
- .from(options.getInputTable()).withoutValidation()
- .withTestServices(fakeBqServices))
- .apply(BigQueryIO.Write
- .to(options.getOutputTable())
- .withSchema(NestedValueProvider.of(
- options.getOutputSchema(), new JsonSchemaToTableSchema()))
- .withTestServices(fakeBqServices)
- .withoutValidation());
+ BigQueryIO.Read.Bound read = BigQueryIO.Read.from(
+ options.getInputTable()).withoutValidation();
+ pipeline.apply(read);
+ // Test that this doesn't throw.
+ DisplayData.from(read);
}
@Test
- public void testRuntimeOptionsNotCalledInApplyInputQuery() throws IOException {
+ public void testRuntimeOptionsNotCalledInApplyInputQuery() {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
- FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
- .withJobService(new FakeJobService());
+ bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
+ BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery(
+ options.getInputQuery()).withoutValidation();
+ pipeline.apply(read);
+ // Test that this doesn't throw.
+ DisplayData.from(read);
+ }
+
+ @Test
+ public void testRuntimeOptionsNotCalledInApplyOutput() {
+ RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ bqOptions.setTempLocation("gs://testbucket/testdir");
+ Pipeline pipeline = TestPipeline.create(options);
+ BigQueryIO.Write.Bound write = BigQueryIO.Write
+ .to(options.getOutputTable())
+ .withSchema(NestedValueProvider.of(
+ options.getOutputSchema(), new JsonSchemaToTableSchema()))
+ .withoutValidation();
pipeline
- .apply(BigQueryIO.Read
- .fromQuery(options.getInputQuery()).withoutValidation()
- .withTestServices(fakeBqServices))
- .apply(BigQueryIO.Write
- .to(options.getOutputTable())
- .withSchema(NestedValueProvider.of(
- options.getOutputSchema(), new JsonSchemaToTableSchema()))
- .withTestServices(fakeBqServices)
- .withoutValidation());
+ .apply(Create.<TableRow>of())
+ .apply(write);
+ // Test that this doesn't throw.
+ DisplayData.from(write);
+ }
+
+ @Test
+ public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() {
+ BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+ bqOptions.setProject("project");
+ BigQueryIO.TagWithUniqueIdsAndTable tag =
+ new BigQueryIO.TagWithUniqueIdsAndTable(
+ bqOptions, NestedValueProvider.of(
+ StaticValueProvider.of("data_set.table_name"),
+ new BigQueryIO.TableSpecToTableRef()), null);
+ TableReference table = BigQueryIO.parseTableSpec(tag.getTableSpec().get());
+ assertNotNull(table.getProjectId());
}
private static void testNumFiles(File tempDir, int expectedNumFiles) {