You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/14 23:26:50 UTC

[1/2] beam git commit: BQIO.Read option for use with repeated template invocations.

Repository: beam
Updated Branches:
  refs/heads/master f77d0351c -> 47273b969


BQIO.Read option for use with repeated template invocations.

Using this option takes a different code path that generates
job ID at runtime and hence the job can be invoked repeatedly
via templates, however that comes at the cost of not supporting
dynamic work rebalancing (as of writing).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5c0961f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5c0961f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5c0961f

Branch: refs/heads/master
Commit: c5c0961f7f05d7288ffcab05e629acec78f4c121
Parents: f77d035
Author: Sam McVeety <sg...@google.com>
Authored: Wed Apr 19 20:34:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 14 16:10:11 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 181 +++++++++++++++----
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |  45 +++--
 .../io/gcp/bigquery/PassThroughThenCleanup.java |  46 +++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  74 +++++---
 .../sdk/io/gcp/bigquery/FakeDatasetService.java |   5 +-
 5 files changed, 268 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c5c0961f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 6a93279..cc288e1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -43,10 +43,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -67,8 +70,17 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
@@ -76,7 +88,10 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -284,6 +299,7 @@ public class BigQueryIO {
   public static Read read() {
     return new AutoValue_BigQueryIO_Read.Builder()
         .setValidate(true)
+        .setWithTemplateCompatibility(false)
         .setBigQueryServices(new BigQueryServicesImpl())
         .build();
   }
@@ -296,6 +312,9 @@ public class BigQueryIO {
     abstract boolean getValidate();
     @Nullable abstract Boolean getFlattenResults();
     @Nullable abstract Boolean getUseLegacySql();
+
+    abstract Boolean getWithTemplateCompatibility();
+
     abstract BigQueryServices getBigQueryServices();
     abstract Builder toBuilder();
 
@@ -306,6 +325,9 @@ public class BigQueryIO {
       abstract Builder setValidate(boolean validate);
       abstract Builder setFlattenResults(Boolean flattenResults);
       abstract Builder setUseLegacySql(Boolean useLegacySql);
+
+      abstract Builder setWithTemplateCompatibility(Boolean useTemplateCompatibility);
+
       abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
       abstract Read build();
     }
@@ -397,11 +419,35 @@ public class BigQueryIO {
       return toBuilder().setUseLegacySql(false).build();
     }
 
+    /**
+     * Use new template-compatible source implementation.
+     *
+     * <p>Use new template-compatible source implementation. This implementation is compatible with
+     * repeated template invocations. It does not support dynamic work rebalancing.
+     */
+    @Experimental(Experimental.Kind.SOURCE_SINK)
+    public Read withTemplateCompatibility() {
+      return toBuilder().setWithTemplateCompatibility(true).build();
+    }
+
     @VisibleForTesting
     Read withTestServices(BigQueryServices testServices) {
       return toBuilder().setBigQueryServices(testServices).build();
     }
 
+    private BigQuerySourceBase createSource(String jobUuid) {
+      BigQuerySourceBase source;
+      if (getQuery() == null
+          || (getQuery().isAccessible() && Strings.isNullOrEmpty(getQuery().get()))) {
+        source = BigQueryTableSource.create(jobUuid, getTableProvider(), getBigQueryServices());
+      } else {
+        source =
+            BigQueryQuerySource.create(
+                jobUuid, getQuery(), getFlattenResults(), getUseLegacySql(), getBigQueryServices());
+      }
+      return source;
+    }
+
     @Override
     public void validate(PipelineOptions options) {
       // Even if existence validation is disabled, we need to make sure that the BigQueryIO
@@ -483,41 +529,115 @@ public class BigQueryIO {
 
     @Override
     public PCollection<TableRow> expand(PBegin input) {
-      final String stepUuid = BigQueryHelpers.randomUUIDString();
-      BoundedSource<TableRow> source;
-
-      if (getQuery() != null
-          && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
-        source =
-            BigQueryQuerySource.create(
-                stepUuid,
-                getQuery(),
-                getFlattenResults(),
-                getUseLegacySql(),
-                getBigQueryServices());
+      Pipeline p = input.getPipeline();
+      final PCollectionView<String> jobIdTokenView;
+      PCollection<String> jobIdTokenCollection = null;
+      PCollection<TableRow> rows;
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at construction time.
+        final String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.<String>asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid)))
+                .setCoder(getDefaultOutputCoder());
       } else {
-        source =
-            BigQueryTableSource.create(
-                stepUuid,
-                getTableProvider(),
-                getBigQueryServices());
+        // Create a singleton job ID token at execution time.
+        jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.<String>asSingleton());
+
+        final TupleTag<String> filesTag = new TupleTag<>();
+        final TupleTag<String> tableSchemaTag = new TupleTag<>();
+        PCollectionTuple tuple =
+            jobIdTokenCollection.apply(
+                "RunCreateJob",
+                ParDo.of(
+                        new DoFn<String, String>() {
+                          @ProcessElement
+                          public void processElement(ProcessContext c) throws Exception {
+                            String jobUuid = c.element();
+                            BigQuerySourceBase source = createSource(jobUuid);
+                            String schema =
+                                BigQueryHelpers.toJsonString(
+                                    source.getSchema(c.getPipelineOptions()));
+                            c.output(tableSchemaTag, schema);
+                            List<ResourceId> files = source.extractFiles(c.getPipelineOptions());
+                            for (ResourceId file : files) {
+                              c.output(file.toString());
+                            }
+                          }
+                        })
+                    .withOutputTags(filesTag, TupleTagList.of(tableSchemaTag)));
+        tuple.get(filesTag).setCoder(StringUtf8Coder.of());
+        tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());
+        final PCollectionView<String> schemaView =
+            tuple.get(tableSchemaTag).apply(View.<String>asSingleton());
+        rows =
+            tuple
+                .get(filesTag)
+                .apply(
+                    WithKeys.of(
+                        new SerializableFunction<String, String>() {
+                          public String apply(String s) {
+                            return s;
+                          }
+                        }))
+                .apply(Reshuffle.<String, String>of())
+                .apply(Values.<String>create())
+                .apply(
+                    "ReadFiles",
+                    ParDo.of(
+                            new DoFn<String, TableRow>() {
+                              @ProcessElement
+                              public void processElement(ProcessContext c) throws Exception {
+                                TableSchema schema =
+                                    BigQueryHelpers.fromJsonString(
+                                        c.sideInput(schemaView), TableSchema.class);
+                                String jobUuid = c.sideInput(jobIdTokenView);
+                                BigQuerySourceBase source = createSource(jobUuid);
+                                List<BoundedSource<TableRow>> sources =
+                                    source.createSources(
+                                        ImmutableList.of(
+                                            FileSystems.matchNewResource(
+                                                c.element(), false /* is directory */)),
+                                        schema);
+                                checkArgument(sources.size() == 1, "Expected exactly one source.");
+                                BoundedSource<TableRow> avroSource = sources.get(0);
+                                BoundedSource.BoundedReader<TableRow> reader =
+                                    avroSource.createReader(c.getPipelineOptions());
+                                for (boolean more = reader.start(); more; more = reader.advance()) {
+                                  c.output(reader.getCurrent());
+                                }
+                              }
+                            })
+                        .withSideInputs(schemaView, jobIdTokenView));
       }
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
           new PassThroughThenCleanup.CleanupOperation() {
             @Override
-            void cleanup(PipelineOptions options) throws Exception {
+            void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
+              PipelineOptions options = c.getPipelineOptions();
               BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+              String jobUuid = c.getJobId();
               final String extractDestinationDir =
-                  resolveTempLocation(
-                      bqOptions.getTempLocation(),
-                      "BigQueryExtractTemp",
-                      stepUuid);
-
+                  resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", jobUuid);
+              final String executingProject = bqOptions.getProject();
               JobReference jobRef =
                   new JobReference()
-                      .setProjectId(bqOptions.getProject())
-                      .setJobId(
-                          getExtractJobId(createJobIdToken(bqOptions.getJobName(), stepUuid)));
+                      .setProjectId(executingProject)
+                      .setJobId(getExtractJobId(createJobIdToken(bqOptions.getJobName(), jobUuid)));
 
               Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
 
@@ -525,16 +645,13 @@ public class BigQueryIO {
                 List<ResourceId> extractFiles =
                     getExtractFilePaths(extractDestinationDir, extractJob);
                 if (extractFiles != null && !extractFiles.isEmpty()) {
-                  FileSystems.delete(extractFiles,
-                      MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+                  FileSystems.delete(
+                      extractFiles, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
                 }
               }
             }
           };
-      return input.getPipeline()
-          .apply(org.apache.beam.sdk.io.Read.from(source))
-          .setCoder(getDefaultOutputCoder())
-          .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
+      return rows.apply(new PassThroughThenCleanup<TableRow>(cleanupOperation, jobIdTokenView));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c5c0961f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 945c7d4..2de60a2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -75,6 +75,31 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
     this.bqServices = checkNotNull(bqServices, "bqServices");
   }
 
+  protected TableSchema getSchema(PipelineOptions options) throws Exception {
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    TableReference tableToExtract = getTableToExtract(bqOptions);
+    TableSchema tableSchema =
+        bqServices.getDatasetService(bqOptions).getTable(tableToExtract).getSchema();
+    return tableSchema;
+  }
+
+  protected List<ResourceId> extractFiles(PipelineOptions options) throws Exception {
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    TableReference tableToExtract = getTableToExtract(bqOptions);
+    JobService jobService = bqServices.getJobService(bqOptions);
+    String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
+    final String extractDestinationDir =
+        resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+    List<ResourceId> tempFiles =
+        executeExtract(
+            extractJobId,
+            tableToExtract,
+            jobService,
+            bqOptions.getProject(),
+            extractDestinationDir);
+    return tempFiles;
+  }
+
   @Override
   public List<BoundedSource<TableRow>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
@@ -83,20 +108,10 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
     // We ignore desiredBundleSizeBytes anyway, however in any case, we should not initiate
     // another BigQuery extract job for the repeated split() calls.
     if (cachedSplitResult == null) {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      TableReference tableToExtract = getTableToExtract(bqOptions);
-      JobService jobService = bqServices.getJobService(bqOptions);
-
-      final String extractDestinationDir =
-          resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
-
-      String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
-      List<ResourceId> tempFiles = executeExtract(
-          extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir);
-
-      TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
-          .getTable(tableToExtract).getSchema();
+      List<ResourceId> tempFiles = extractFiles(options);
+      TableSchema tableSchema = getSchema(options);
 
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       cleanupTempResource(bqOptions);
       cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema));
     }
@@ -147,8 +162,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
     return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
   }
 
-  private List<BoundedSource<TableRow>> createSources(
-      List<ResourceId> files, TableSchema tableSchema) throws IOException, InterruptedException {
+  List<BoundedSource<TableRow>> createSources(List<ResourceId> files, TableSchema tableSchema)
+      throws IOException, InterruptedException {
     final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema);
 
     SerializableFunction<GenericRecord, TableRow> function =

http://git-wip-us.apache.org/repos/asf/beam/blob/c5c0961f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
index f49c4e1..de26c8d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
@@ -41,9 +41,12 @@ import org.apache.beam.sdk.values.TupleTagList;
 class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
 
   private CleanupOperation cleanupOperation;
+  private PCollectionView<String> jobIdSideInput;
 
-  PassThroughThenCleanup(CleanupOperation cleanupOperation) {
+  PassThroughThenCleanup(
+      CleanupOperation cleanupOperation, PCollectionView<String> jobIdSideInput) {
     this.cleanupOperation = cleanupOperation;
+    this.jobIdSideInput = jobIdSideInput;
   }
 
   @Override
@@ -57,16 +60,19 @@ class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T
         .setCoder(VoidCoder.of())
         .apply(View.<Void>asIterable());
 
-    input.getPipeline()
+    input
+        .getPipeline()
         .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
-        .apply("Cleanup", ParDo.of(
-            new DoFn<CleanupOperation, Void>() {
-              @ProcessElement
-              public void processElement(ProcessContext c)
-                  throws Exception {
-                c.element().cleanup(c.getPipelineOptions());
-              }
-            }).withSideInputs(cleanupSignalView));
+        .apply(
+            "Cleanup",
+            ParDo.of(
+                    new DoFn<CleanupOperation, Void>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) throws Exception {
+                        c.element().cleanup(new ContextContainer(c, jobIdSideInput));
+                      }
+                    })
+                .withSideInputs(jobIdSideInput, cleanupSignalView));
 
     return outputs.get(mainOutput);
   }
@@ -79,6 +85,24 @@ class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T
   }
 
   abstract static class CleanupOperation implements Serializable {
-    abstract void cleanup(PipelineOptions options) throws Exception;
+    abstract void cleanup(ContextContainer container) throws Exception;
+  }
+
+  static class ContextContainer {
+    private PCollectionView<String> view;
+    private DoFn<?, ?>.ProcessContext context;
+
+    public ContextContainer(DoFn<?, ?>.ProcessContext context, PCollectionView<String> view) {
+      this.view = view;
+      this.context = context;
+    }
+
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    public String getJobId() {
+      return context.sideInput(view);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c5c0961f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d31f3a0..3465b4e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -402,7 +402,17 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testReadFromTable() throws IOException, InterruptedException {
+  public void testReadFromTableOldSource() throws IOException, InterruptedException {
+    testReadFromTable(false);
+  }
+
+  @Test
+  public void testReadFromTableTemplateCompatibility() throws IOException, InterruptedException {
+    testReadFromTable(true);
+  }
+
+  private void testReadFromTable(boolean useTemplateCompatibility)
+      throws IOException, InterruptedException {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
@@ -444,17 +454,27 @@ public class BigQueryIOTest implements Serializable {
         .withDatasetService(fakeDatasetService);
 
     Pipeline p = TestPipeline.create(bqOptions);
-    PCollection<KV<String, Long>> output = p
-        .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable")
+    BigQueryIO.Read read =
+        BigQueryIO.read()
+            .from("non-executing-project:somedataset.sometable")
             .withTestServices(fakeBqServices)
-            .withoutValidation())
-        .apply(ParDo.of(new DoFn<TableRow, KV<String, Long>>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) throws Exception {
-            c.output(KV.of((String) c.element().get("name"),
-                Long.valueOf((String) c.element().get("number"))));
-          }
-        }));
+            .withoutValidation();
+    if (useTemplateCompatibility) {
+      read = read.withTemplateCompatibility();
+    }
+    PCollection<KV<String, Long>> output =
+        p.apply(read)
+            .apply(
+                ParDo.of(
+                    new DoFn<TableRow, KV<String, Long>>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) throws Exception {
+                        c.output(
+                            KV.of(
+                                (String) c.element().get("name"),
+                                Long.valueOf((String) c.element().get("number"))));
+                      }
+                    }));
 
     PAssert.that(output)
         .containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L), KV.of("c", 3L)));
@@ -1721,13 +1741,17 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testPassThroughThenCleanup() throws Exception {
 
-    PCollection<Integer> output = p
-        .apply(Create.of(1, 2, 3))
-        .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
-          @Override
-          void cleanup(PipelineOptions options) throws Exception {
-            // no-op
-          }}));
+    PCollection<Integer> output =
+        p.apply(Create.of(1, 2, 3))
+            .apply(
+                new PassThroughThenCleanup<Integer>(
+                    new CleanupOperation() {
+                      @Override
+                      void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
+                        // no-op
+                      }
+                    },
+                    p.apply("Create1", Create.of("")).apply(View.<String>asSingleton())));
 
     PAssert.that(output).containsInAnyOrder(1, 2, 3);
 
@@ -1738,11 +1762,15 @@ public class BigQueryIOTest implements Serializable {
   public void testPassThroughThenCleanupExecuted() throws Exception {
 
     p.apply(Create.empty(VarIntCoder.of()))
-        .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
-          @Override
-          void cleanup(PipelineOptions options) throws Exception {
-            throw new RuntimeException("cleanup executed");
-          }}));
+        .apply(
+            new PassThroughThenCleanup<Integer>(
+                new CleanupOperation() {
+                  @Override
+                  void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
+                    throw new RuntimeException("cleanup executed");
+                  }
+                },
+                p.apply("Create1", Create.of("")).apply(View.<String>asSingleton())));
 
     thrown.expect(RuntimeException.class);
     thrown.expectMessage("cleanup executed");

http://git-wip-us.apache.org/repos/asf/beam/blob/c5c0961f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index 6ee5340..bcd84f7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -273,7 +273,8 @@ class FakeDatasetService implements DatasetService, Serializable {
 
   void throwNotFound(String format, Object... args) throws IOException {
     throw new IOException(
-        new GoogleJsonResponseException.Builder(404,
-            String.format(format, args), new HttpHeaders()).build());
+        String.format(format, args),
+        new GoogleJsonResponseException.Builder(404, String.format(format, args), new HttpHeaders())
+            .build());
   }
 }


[2/2] beam git commit: This closes #2123: Provide implementation of BQIO.Read that does not use Source API

Posted by jk...@apache.org.
This closes #2123: Provide implementation of BQIO.Read that does not use Source API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47273b96
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47273b96
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47273b96

Branch: refs/heads/master
Commit: 47273b9697ab8af8b30e910d5b992398002ce584
Parents: f77d035 c5c0961
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 14 16:15:18 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 14 16:15:18 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 181 +++++++++++++++----
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |  45 +++--
 .../io/gcp/bigquery/PassThroughThenCleanup.java |  46 +++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  74 +++++---
 .../sdk/io/gcp/bigquery/FakeDatasetService.java |   5 +-
 5 files changed, 268 insertions(+), 83 deletions(-)
----------------------------------------------------------------------