You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/28 15:51:46 UTC
[4/6] beam git commit: Refactor BigQueryIO helper methods into a
BigQueryHelpers class. Move helper transforms for BigQueryIO.Read into
individual files. Change private to package in BigQueryHelpers.
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 35bb9c3..eca95b9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -20,8 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
@@ -101,13 +101,9 @@ import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.JsonSchemaToTableSchema;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.TableRowWriter;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -116,6 +112,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTables;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -1582,7 +1579,7 @@ public class BigQueryIOTest implements Serializable {
public void testStreamingWriteFnCreateNever() throws Exception {
BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn(
null, CreateDisposition.CREATE_NEVER, null, new FakeBigQueryServices());
- assertEquals(BigQueryIO.parseTableSpec("dataset.table"),
+ assertEquals(BigQueryHelpers.parseTableSpec("dataset.table"),
fn.getOrCreateTable(null, "dataset.table"));
}
@@ -1616,7 +1613,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testTableParsing() {
- TableReference ref = BigQueryIO
+ TableReference ref = BigQueryHelpers
.parseTableSpec("my-project:data_set.table_name");
Assert.assertEquals("my-project", ref.getProjectId());
Assert.assertEquals("data_set", ref.getDatasetId());
@@ -1625,14 +1622,14 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testTableParsing_validPatterns() {
- BigQueryIO.parseTableSpec("a123-456:foo_bar.d");
- BigQueryIO.parseTableSpec("a12345:b.c");
- BigQueryIO.parseTableSpec("b12345.c");
+ BigQueryHelpers.parseTableSpec("a123-456:foo_bar.d");
+ BigQueryHelpers.parseTableSpec("a12345:b.c");
+ BigQueryHelpers.parseTableSpec("b12345.c");
}
@Test
public void testTableParsing_noProjectId() {
- TableReference ref = BigQueryIO
+ TableReference ref = BigQueryHelpers
.parseTableSpec("data_set.table_name");
Assert.assertEquals(null, ref.getProjectId());
Assert.assertEquals("data_set", ref.getDatasetId());
@@ -1642,25 +1639,25 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testTableParsingError() {
thrown.expect(IllegalArgumentException.class);
- BigQueryIO.parseTableSpec("0123456:foo.bar");
+ BigQueryHelpers.parseTableSpec("0123456:foo.bar");
}
@Test
public void testTableParsingError_2() {
thrown.expect(IllegalArgumentException.class);
- BigQueryIO.parseTableSpec("myproject:.bar");
+ BigQueryHelpers.parseTableSpec("myproject:.bar");
}
@Test
public void testTableParsingError_3() {
thrown.expect(IllegalArgumentException.class);
- BigQueryIO.parseTableSpec(":a.b");
+ BigQueryHelpers.parseTableSpec(":a.b");
}
@Test
public void testTableParsingError_slash() {
thrown.expect(IllegalArgumentException.class);
- BigQueryIO.parseTableSpec("a\\b12345:c.d");
+ BigQueryHelpers.parseTableSpec("a\\b12345:c.d");
}
// Test that BigQuery's special null placeholder objects can be encoded.
@@ -1709,7 +1706,7 @@ public class BigQueryIOTest implements Serializable {
toJsonString(new TableRow().set("name", "c").set("number", "3")));
String jobIdToken = "testJobIdToken";
- TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name");
+ TableReference table = BigQueryHelpers.parseTableSpec("project.data_set.table_name");
String extractDestinationDir = "mock://tempLocation";
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
@@ -1748,7 +1745,7 @@ public class BigQueryIOTest implements Serializable {
toJsonString(new TableRow().set("name", "c").set("number", "3")));
String jobIdToken = "testJobIdToken";
- TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name");
+ TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
String extractDestinationDir = "mock://tempLocation";
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
@@ -1815,7 +1812,7 @@ public class BigQueryIOTest implements Serializable {
String jobIdToken = "testJobIdToken";
String extractDestinationDir = "mock://tempLocation";
- TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
+ TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
StaticValueProvider.of(destinationTable),
@@ -1904,7 +1901,7 @@ public class BigQueryIOTest implements Serializable {
String jobIdToken = "testJobIdToken";
String extractDestinationDir = "mock://tempLocation";
- TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
+ TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
StaticValueProvider.of(destinationTable),
@@ -2285,9 +2282,12 @@ public class BigQueryIOTest implements Serializable {
String datasetId = "somedataset";
List<String> tables = Lists.newArrayList("table1", "table2", "table3");
List<TableReference> tableRefs = Lists.newArrayList(
- BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(0))),
- BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(1))),
- BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(2))));
+ BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
+ tables.get(0))),
+ BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
+ tables.get(1))),
+ BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
+ tables.get(2))));
doThrow(new IOException("Unable to delete table"))
.when(mockDatasetService).deleteTable(tableRefs.get(0));
@@ -2371,8 +2371,8 @@ public class BigQueryIOTest implements Serializable {
new BigQueryIO.TagWithUniqueIdsAndTable<TableRow>(
bqOptions, NestedValueProvider.of(
StaticValueProvider.of("data_set.table_name"),
- new BigQueryIO.TableSpecToTableRef()), null, null);
- TableReference table = BigQueryIO.parseTableSpec(tag.getTableSpec().get());
+ new TableSpecToTableRef()), null, null);
+ TableReference table = BigQueryHelpers.parseTableSpec(tag.getTableSpec().get());
assertNotNull(table.getProjectId());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 7b5b226..fa84119 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -222,7 +222,7 @@ public class BigQueryUtilTest {
List<TableRow> rows = new ArrayList<>();
try (BigQueryTableRowIterator iterator =
BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"), mockClient)) {
+ BigQueryHelpers.parseTableSpec("project:dataset.table"), mockClient)) {
iterator.open();
while (iterator.advance()) {
rows.add(iterator.getCurrent());
@@ -261,7 +261,7 @@ public class BigQueryUtilTest {
onTableList(dataList);
try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"),
+ BigQueryHelpers.parseTableSpec("project:dataset.table"),
mockClient)) {
iterator.open();
Assert.assertTrue(iterator.advance());
@@ -291,7 +291,7 @@ public class BigQueryUtilTest {
onTableList(dataList);
try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"),
+ BigQueryHelpers.parseTableSpec("project:dataset.table"),
mockClient)) {
iterator.open();
@@ -320,7 +320,7 @@ public class BigQueryUtilTest {
.thenReturn(page2);
try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"),
+ BigQueryHelpers.parseTableSpec("project:dataset.table"),
mockClient)) {
iterator.open();
@@ -350,7 +350,7 @@ public class BigQueryUtilTest {
.thenThrow(new IOException("No such table"));
try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"),
+ BigQueryHelpers.parseTableSpec("project:dataset.table"),
mockClient)) {
try {
iterator.open(); // throws.
@@ -387,7 +387,7 @@ public class BigQueryUtilTest {
errorsIndices.add(new ArrayList<Long>());
onInsertAll(errorsIndices);
- TableReference ref = BigQueryIO
+ TableReference ref = BigQueryHelpers
.parseTableSpec("project:dataset.table");
DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, options, 5);