You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:55:01 UTC

[07/10] beam git commit: Simplify configuration of StreamWithDedup

Simplify configuration of StreamWithDedup


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1adcbaea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1adcbaea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1adcbaea

Branch: refs/heads/master
Commit: 1adcbaea799e83016ec91f7b7155c3a25804ce6c
Parents: 5c71589
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 18:19:51 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:32 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 73 +++++++-------------
 1 file changed, 26 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1adcbaea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index d2f6ba6..e039c8c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -40,7 +40,6 @@ import com.google.auto.value.AutoValue;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -462,7 +461,7 @@ public class BigQueryIO {
     abstract boolean getValidate();
     @Nullable abstract Boolean getFlattenResults();
     @Nullable abstract Boolean getUseLegacySql();
-    @Nullable abstract BigQueryServices getBigQueryServices();
+    abstract BigQueryServices getBigQueryServices();
 
     abstract Builder toBuilder();
 
@@ -645,8 +644,6 @@ public class BigQueryIO {
           jobUuid, new BeamJobUuidToBigQueryJobUuid());
 
       BoundedSource<TableRow> source;
-      final BigQueryServices bqServices =
-          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
 
       final String extractDestinationDir;
       String tempLocation = bqOptions.getTempLocation();
@@ -670,11 +667,11 @@ public class BigQueryIO {
                 getFlattenResults(),
                 getUseLegacySql(),
                 extractDestinationDir,
-                bqServices);
+                getBigQueryServices());
       } else {
         ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
         source = BigQueryTableSource.create(
-            jobIdToken, inputTable, extractDestinationDir, bqServices,
+            jobIdToken, inputTable, extractDestinationDir, getBigQueryServices(),
             StaticValueProvider.of(executingProject));
       }
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
@@ -687,7 +684,7 @@ public class BigQueryIO {
                   .setProjectId(executingProject)
                   .setJobId(getExtractJobId(jobIdToken));
 
-              Job extractJob = bqServices.getJobService(bqOptions)
+              Job extractJob = getBigQueryServices().getJobService(bqOptions)
                   .getJob(jobRef);
 
               Collection<String> extractFiles = null;
@@ -1390,7 +1387,7 @@ public class BigQueryIO {
     @Nullable abstract String getTableDescription();
     /** An option to indicate if table validation is desired. Default is true. */
     abstract boolean getValidate();
-    @Nullable abstract BigQueryServices getBigQueryServices();
+    abstract BigQueryServices getBigQueryServices();
 
     abstract Builder toBuilder();
 
@@ -1650,12 +1647,10 @@ public class BigQueryIO {
           "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
 
       // The user specified a table.
-      BigQueryServices bqServices =
-          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
       if (getJsonTableRef() != null && getValidate()) {
         TableReference table = getTableWithDefaultProject(options).get();
 
-        DatasetService datasetService = bqServices.getDatasetService(options);
+        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
         // Check for destination table presence and emptiness for early failure notification.
         // Note that a presence check can fail when the table or dataset is created by an earlier
         // stage of the pipeline. For these cases the #withoutValidation method can be used to
@@ -1693,7 +1688,7 @@ public class BigQueryIO {
         checkArgument(
             !Strings.isNullOrEmpty(tempLocation),
             "BigQueryIO.Write needs a GCS temp location to store temp files.");
-        if (bqServices == null) {
+        if (getBigQueryServices() == null) {
           try {
             GcsPath.fromUri(tempLocation);
           } catch (IllegalArgumentException e) {
@@ -1711,19 +1706,11 @@ public class BigQueryIO {
     public PDone expand(PCollection<TableRow> input) {
       Pipeline p = input.getPipeline();
       BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-      BigQueryServices bqServices =
-          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
 
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamWithDeDup and BigQuery's streaming import API.
       if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
-        return input.apply(
-            new StreamWithDeDup(getTable(), getTableRefFunction(),
-                getJsonSchema() == null ? null : NestedValueProvider.of(
-                    getJsonSchema(), new JsonSchemaToTableSchema()),
-                getCreateDisposition(),
-                getTableDescription(),
-                bqServices));
+        return input.apply(new StreamWithDeDup(this));
       }
 
       ValueProvider<TableReference> table = getTableWithDefaultProject(options);
@@ -1786,7 +1773,7 @@ public class BigQueryIO {
           .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
           .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
               false,
-              bqServices,
+              getBigQueryServices(),
               jobIdTokenView,
               tempFilePrefix,
               NestedValueProvider.of(table, new TableRefToJson()),
@@ -1800,7 +1787,7 @@ public class BigQueryIO {
           .apply("TempTablesView", View.<String>asIterable());
       singleton.apply(ParDo
           .of(new WriteRename(
-              bqServices,
+              getBigQueryServices(),
               jobIdTokenView,
               NestedValueProvider.of(table, new TableRefToJson()),
               getWriteDisposition(),
@@ -1814,7 +1801,7 @@ public class BigQueryIO {
           .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
           .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
               true,
-              bqServices,
+              getBigQueryServices(),
               jobIdTokenView,
               tempFilePrefix,
               NestedValueProvider.of(table, new TableRefToJson()),
@@ -2740,25 +2727,11 @@ public class BigQueryIO {
   * it leverages BigQuery best effort de-dup mechanism.
    */
   private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> {
-    @Nullable private final transient ValueProvider<TableReference> tableReference;
-    @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-    @Nullable private final transient ValueProvider<TableSchema> tableSchema;
-    private final Write.CreateDisposition createDisposition;
-    private final BigQueryServices bqServices;
-    @Nullable private final String tableDescription;
+    private final Write write;
 
     /** Constructor. */
-    StreamWithDeDup(ValueProvider<TableReference> tableReference,
-                    @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-                    @Nullable ValueProvider<TableSchema> tableSchema,
-                    Write.CreateDisposition createDisposition,
-                    @Nullable String tableDescription, BigQueryServices bqServices) {
-      this.tableReference = tableReference;
-      this.tableRefFunction = tableRefFunction;
-      this.tableSchema = tableSchema;
-      this.createDisposition = createDisposition;
-      this.bqServices = checkNotNull(bqServices, "bqServices");
-      this.tableDescription = tableDescription;
+    StreamWithDeDup(Write write) {
+      this.write = write;
     }
 
     @Override
@@ -2780,20 +2753,26 @@ public class BigQueryIO {
 
       PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input.apply(ParDo.of(
           new TagWithUniqueIdsAndTable(input.getPipeline().getOptions().as(BigQueryOptions.class),
-              tableReference, tableRefFunction)));
+              write.getTable(), write.getTableRefFunction())));
 
       // To prevent having the same TableRow processed more than once with regenerated
       // different unique ids, this implementation relies on "checkpointing", which is
       // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
       // performed by Reshuffle.
+      NestedValueProvider<TableSchema, String> schema =
+          write.getJsonSchema() == null
+              ? null
+              : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
       tagged
           .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
           .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-          .apply(ParDo.of(new StreamingWriteFn(
-              tableSchema,
-              createDisposition,
-              tableDescription,
-              bqServices)));
+          .apply(
+              ParDo.of(
+                  new StreamingWriteFn(
+                      schema,
+                      write.getCreateDisposition(),
+                      write.getTableDescription(),
+                      write.getBigQueryServices())));
 
       // Note that the implementation to return PDone here breaks the
       // implicit assumption about the job execution order. If a user