You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/24 14:47:31 UTC

[2/2] incubator-beam git commit: Choose BigQuery Write implementation based on Input PCollection

Choose BigQuery Write implementation based on Input PCollection

Stop using PipelineOptions, and instead use the boundedness of the input
to choose how to write to BigQuery. This means that runners that don't
use the streaming flag still function appropriately.

Fixes BEAM-746


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75d137b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75d137b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75d137b8

Branch: refs/heads/master
Commit: 75d137b81ae240ad1a2e8942627738a6871581c1
Parents: 6d9d8bc
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 12 17:58:57 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Oct 24 07:37:41 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  7 ++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 35 +++++++++++++++-----
 2 files changed, 31 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75d137b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 5626067..50c5ae9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -119,6 +119,7 @@ import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
@@ -1748,9 +1749,9 @@ public class BigQueryIO {
         BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
         BigQueryServices bqServices = getBigQueryServices();
 
-        // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
-        // and BigQuery's streaming import API.
-        if (options.isStreaming() || tableRefFunction != null) {
+        // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
+        // StreamWithDeDup and BigQuery's streaming import API.
+        if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
           return input.apply(
               new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices));
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75d137b8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 74c35a6..51a69a2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -91,6 +91,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource;
@@ -120,8 +121,10 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -1190,13 +1193,12 @@ public class BigQueryIOTest implements Serializable {
     assertThat(displayData, hasDisplayItem("validation", false));
   }
 
-  private void testWriteValidatesDataset(boolean streaming) throws Exception {
+  private void testWriteValidatesDataset(boolean unbounded) throws Exception {
     String projectId = "someproject";
     String datasetId = "somedataset";
 
     BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     options.setProject(projectId);
-    options.setStreaming(streaming);
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(mockJobService)
@@ -1210,17 +1212,34 @@ public class BigQueryIOTest implements Serializable {
     tableRef.setDatasetId(datasetId);
     tableRef.setTableId("sometable");
 
+    PCollection<TableRow> tableRows;
+    if (unbounded) {
+      tableRows =
+          p.apply(CountingInput.unbounded())
+              .apply(
+                  MapElements.via(
+                      new SimpleFunction<Long, TableRow>() {
+                        @Override
+                        public TableRow apply(Long input) {
+                          return null;
+                        }
+                      }))
+              .setCoder(TableRowJsonCoder.of());
+    } else {
+      tableRows = p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()));
+    }
+
     thrown.expect(RuntimeException.class);
     // Message will be one of following depending on the execution environment.
     thrown.expectMessage(
         Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
             .or(Matchers.containsString("BigQuery dataset not found for table")));
-    p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
-     .apply(BigQueryIO.Write
-         .to(tableRef)
-         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-         .withSchema(new TableSchema())
-         .withTestServices(fakeBqServices));
+    tableRows
+        .apply(
+            BigQueryIO.Write.to(tableRef)
+                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                .withSchema(new TableSchema())
+                .withTestServices(fakeBqServices));
   }
 
   @Test