You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/08 20:40:37 UTC

[02/13] incubator-beam git commit: Port BigQueryIO to new DoFn

Port BigQueryIO to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6395e9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6395e9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6395e9d

Branch: refs/heads/master
Commit: d6395e9d45dcbeb9b3d3e2f8214a49866622b9cf
Parents: 87313f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:26:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 21 ++++++++++----------
 1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6395e9d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ed2c32e..36e09f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -1785,7 +1784,7 @@ public class BigQueryIO {
         return PDone.in(input.getPipeline());
       }
 
-      private class WriteBundles extends OldDoFn<TableRow, KV<String, Long>> {
+      private class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
         private TableRowWriter writer = null;
         private final String tempFilePrefix;
 
@@ -1793,7 +1792,7 @@ public class BigQueryIO {
           this.tempFilePrefix = tempFilePrefix;
         }
 
-        @Override
+        @ProcessElement
         public void processElement(ProcessContext c) throws Exception {
           if (writer == null) {
             writer = new TableRowWriter(tempFilePrefix);
@@ -1806,7 +1805,7 @@ public class BigQueryIO {
             // Discard write result and close the write.
             try {
               writer.close();
-              // The writer does not need to be reset, as this OldDoFn cannot be reused.
+              // The writer does not need to be reset, as this DoFn cannot be reused.
             } catch (Exception closeException) {
               // Do not mask the exception that caused the write to fail.
               e.addSuppressed(closeException);
@@ -1815,7 +1814,7 @@ public class BigQueryIO {
           }
         }
 
-        @Override
+        @FinishBundle
         public void finishBundle(Context c) throws Exception {
           if (writer != null) {
             c.output(writer.close());
@@ -1959,7 +1958,7 @@ public class BigQueryIO {
     /**
      * Partitions temporary files based on number of files and file sizes.
      */
-    static class WritePartition extends OldDoFn<String, KV<Long, List<String>>> {
+    static class WritePartition extends DoFn<String, KV<Long, List<String>>> {
       private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
       private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
       private TupleTag<KV<Long, List<String>>> singlePartitionTag;
@@ -1973,7 +1972,7 @@ public class BigQueryIO {
         this.singlePartitionTag = singlePartitionTag;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
         if (results.isEmpty()) {
@@ -2015,7 +2014,7 @@ public class BigQueryIO {
     /**
      * Writes partitions to BigQuery tables.
      */
-    static class WriteTables extends OldDoFn<KV<Long, Iterable<List<String>>>, String> {
+    static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
       private final boolean singlePartition;
       private final BigQueryServices bqServices;
       private final String jobIdToken;
@@ -2044,7 +2043,7 @@ public class BigQueryIO {
         this.createDisposition = createDisposition;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
         String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
@@ -2149,7 +2148,7 @@ public class BigQueryIO {
     /**
      * Copies temporary tables to destination table.
      */
-    static class WriteRename extends OldDoFn<String, Void> {
+    static class WriteRename extends DoFn<String, Void> {
       private final BigQueryServices bqServices;
       private final String jobIdToken;
       private final String jsonTableRef;
@@ -2172,7 +2171,7 @@ public class BigQueryIO {
         this.tempTablesView = tempTablesView;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));