You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/28 15:51:46 UTC

[4/6] beam git commit: Refactor BigQueryIO helper methods into a BigQueryHelpers class. Move helper transforms for BigQueryIO.Read into individual files. Change private to package in BigQueryHelpers.

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 35bb9c3..eca95b9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -20,8 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
@@ -101,13 +101,9 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.JsonSchemaToTableSchema;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.TableRowWriter;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -116,6 +112,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTables;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -1582,7 +1579,7 @@ public class BigQueryIOTest implements Serializable {
   public void testStreamingWriteFnCreateNever() throws Exception {
     BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn(
         null, CreateDisposition.CREATE_NEVER, null, new FakeBigQueryServices());
-    assertEquals(BigQueryIO.parseTableSpec("dataset.table"),
+    assertEquals(BigQueryHelpers.parseTableSpec("dataset.table"),
         fn.getOrCreateTable(null, "dataset.table"));
   }
 
@@ -1616,7 +1613,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testTableParsing() {
-    TableReference ref = BigQueryIO
+    TableReference ref = BigQueryHelpers
         .parseTableSpec("my-project:data_set.table_name");
     Assert.assertEquals("my-project", ref.getProjectId());
     Assert.assertEquals("data_set", ref.getDatasetId());
@@ -1625,14 +1622,14 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testTableParsing_validPatterns() {
-    BigQueryIO.parseTableSpec("a123-456:foo_bar.d");
-    BigQueryIO.parseTableSpec("a12345:b.c");
-    BigQueryIO.parseTableSpec("b12345.c");
+    BigQueryHelpers.parseTableSpec("a123-456:foo_bar.d");
+    BigQueryHelpers.parseTableSpec("a12345:b.c");
+    BigQueryHelpers.parseTableSpec("b12345.c");
   }
 
   @Test
   public void testTableParsing_noProjectId() {
-    TableReference ref = BigQueryIO
+    TableReference ref = BigQueryHelpers
         .parseTableSpec("data_set.table_name");
     Assert.assertEquals(null, ref.getProjectId());
     Assert.assertEquals("data_set", ref.getDatasetId());
@@ -1642,25 +1639,25 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testTableParsingError() {
     thrown.expect(IllegalArgumentException.class);
-    BigQueryIO.parseTableSpec("0123456:foo.bar");
+    BigQueryHelpers.parseTableSpec("0123456:foo.bar");
   }
 
   @Test
   public void testTableParsingError_2() {
     thrown.expect(IllegalArgumentException.class);
-    BigQueryIO.parseTableSpec("myproject:.bar");
+    BigQueryHelpers.parseTableSpec("myproject:.bar");
   }
 
   @Test
   public void testTableParsingError_3() {
     thrown.expect(IllegalArgumentException.class);
-    BigQueryIO.parseTableSpec(":a.b");
+    BigQueryHelpers.parseTableSpec(":a.b");
   }
 
   @Test
   public void testTableParsingError_slash() {
     thrown.expect(IllegalArgumentException.class);
-    BigQueryIO.parseTableSpec("a\\b12345:c.d");
+    BigQueryHelpers.parseTableSpec("a\\b12345:c.d");
   }
 
   // Test that BigQuery's special null placeholder objects can be encoded.
@@ -1709,7 +1706,7 @@ public class BigQueryIOTest implements Serializable {
             toJsonString(new TableRow().set("name", "c").set("number", "3")));
 
     String jobIdToken = "testJobIdToken";
-    TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name");
+    TableReference table = BigQueryHelpers.parseTableSpec("project.data_set.table_name");
     String extractDestinationDir = "mock://tempLocation";
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
         StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
@@ -1748,7 +1745,7 @@ public class BigQueryIOTest implements Serializable {
             toJsonString(new TableRow().set("name", "c").set("number", "3")));
 
     String jobIdToken = "testJobIdToken";
-    TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name");
+    TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
     String extractDestinationDir = "mock://tempLocation";
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
         StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
@@ -1815,7 +1812,7 @@ public class BigQueryIOTest implements Serializable {
 
     String jobIdToken = "testJobIdToken";
     String extractDestinationDir = "mock://tempLocation";
-    TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
+    TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
         StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
         StaticValueProvider.of(destinationTable),
@@ -1904,7 +1901,7 @@ public class BigQueryIOTest implements Serializable {
 
     String jobIdToken = "testJobIdToken";
     String extractDestinationDir = "mock://tempLocation";
-    TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
+    TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
         StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
         StaticValueProvider.of(destinationTable),
@@ -2285,9 +2282,12 @@ public class BigQueryIOTest implements Serializable {
     String datasetId = "somedataset";
     List<String> tables = Lists.newArrayList("table1", "table2", "table3");
     List<TableReference> tableRefs = Lists.newArrayList(
-        BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(0))),
-        BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(1))),
-        BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(2))));
+        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
+            tables.get(0))),
+        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
+            tables.get(1))),
+        BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId,
+            tables.get(2))));
 
     doThrow(new IOException("Unable to delete table"))
         .when(mockDatasetService).deleteTable(tableRefs.get(0));
@@ -2371,8 +2371,8 @@ public class BigQueryIOTest implements Serializable {
         new BigQueryIO.TagWithUniqueIdsAndTable<TableRow>(
             bqOptions, NestedValueProvider.of(
                 StaticValueProvider.of("data_set.table_name"),
-                new BigQueryIO.TableSpecToTableRef()), null, null);
-    TableReference table = BigQueryIO.parseTableSpec(tag.getTableSpec().get());
+                new TableSpecToTableRef()), null, null);
+    TableReference table = BigQueryHelpers.parseTableSpec(tag.getTableSpec().get());
     assertNotNull(table.getProjectId());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 7b5b226..fa84119 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -222,7 +222,7 @@ public class BigQueryUtilTest {
     List<TableRow> rows = new ArrayList<>();
     try (BigQueryTableRowIterator iterator =
             BigQueryTableRowIterator.fromTable(
-                BigQueryIO.parseTableSpec("project:dataset.table"), mockClient)) {
+                BigQueryHelpers.parseTableSpec("project:dataset.table"), mockClient)) {
       iterator.open();
       while (iterator.advance()) {
         rows.add(iterator.getCurrent());
@@ -261,7 +261,7 @@ public class BigQueryUtilTest {
     onTableList(dataList);
 
     try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
-        BigQueryIO.parseTableSpec("project:dataset.table"),
+        BigQueryHelpers.parseTableSpec("project:dataset.table"),
         mockClient)) {
       iterator.open();
       Assert.assertTrue(iterator.advance());
@@ -291,7 +291,7 @@ public class BigQueryUtilTest {
     onTableList(dataList);
 
     try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
-        BigQueryIO.parseTableSpec("project:dataset.table"),
+        BigQueryHelpers.parseTableSpec("project:dataset.table"),
         mockClient)) {
       iterator.open();
 
@@ -320,7 +320,7 @@ public class BigQueryUtilTest {
         .thenReturn(page2);
 
     try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
-        BigQueryIO.parseTableSpec("project:dataset.table"),
+        BigQueryHelpers.parseTableSpec("project:dataset.table"),
         mockClient)) {
       iterator.open();
 
@@ -350,7 +350,7 @@ public class BigQueryUtilTest {
         .thenThrow(new IOException("No such table"));
 
     try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
-        BigQueryIO.parseTableSpec("project:dataset.table"),
+        BigQueryHelpers.parseTableSpec("project:dataset.table"),
         mockClient)) {
       try {
         iterator.open(); // throws.
@@ -387,7 +387,7 @@ public class BigQueryUtilTest {
     errorsIndices.add(new ArrayList<Long>());
     onInsertAll(errorsIndices);
 
-    TableReference ref = BigQueryIO
+    TableReference ref = BigQueryHelpers
         .parseTableSpec("project:dataset.table");
     DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, options, 5);