You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/08 20:40:37 UTC
[02/13] incubator-beam git commit: Port BigQueryIO to new DoFn
Port BigQueryIO to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6395e9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6395e9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6395e9d
Branch: refs/heads/master
Commit: d6395e9d45dcbeb9b3d3e2f8214a49866622b9cf
Parents: 87313f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:26:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 ++++++++++----------
1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6395e9d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ed2c32e..36e09f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -1785,7 +1784,7 @@ public class BigQueryIO {
return PDone.in(input.getPipeline());
}
- private class WriteBundles extends OldDoFn<TableRow, KV<String, Long>> {
+ private class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
private TableRowWriter writer = null;
private final String tempFilePrefix;
@@ -1793,7 +1792,7 @@ public class BigQueryIO {
this.tempFilePrefix = tempFilePrefix;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
if (writer == null) {
writer = new TableRowWriter(tempFilePrefix);
@@ -1806,7 +1805,7 @@ public class BigQueryIO {
// Discard write result and close the write.
try {
writer.close();
- // The writer does not need to be reset, as this OldDoFn cannot be reused.
+ // The writer does not need to be reset, as this DoFn cannot be reused.
} catch (Exception closeException) {
// Do not mask the exception that caused the write to fail.
e.addSuppressed(closeException);
@@ -1815,7 +1814,7 @@ public class BigQueryIO {
}
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws Exception {
if (writer != null) {
c.output(writer.close());
@@ -1959,7 +1958,7 @@ public class BigQueryIO {
/**
* Partitions temporary files based on number of files and file sizes.
*/
- static class WritePartition extends OldDoFn<String, KV<Long, List<String>>> {
+ static class WritePartition extends DoFn<String, KV<Long, List<String>>> {
private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
private TupleTag<KV<Long, List<String>>> singlePartitionTag;
@@ -1973,7 +1972,7 @@ public class BigQueryIO {
this.singlePartitionTag = singlePartitionTag;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
if (results.isEmpty()) {
@@ -2015,7 +2014,7 @@ public class BigQueryIO {
/**
* Writes partitions to BigQuery tables.
*/
- static class WriteTables extends OldDoFn<KV<Long, Iterable<List<String>>>, String> {
+ static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
private final boolean singlePartition;
private final BigQueryServices bqServices;
private final String jobIdToken;
@@ -2044,7 +2043,7 @@ public class BigQueryIO {
this.createDisposition = createDisposition;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
@@ -2149,7 +2148,7 @@ public class BigQueryIO {
/**
* Copies temporary tables to destination table.
*/
- static class WriteRename extends OldDoFn<String, Void> {
+ static class WriteRename extends DoFn<String, Void> {
private final BigQueryServices bqServices;
private final String jobIdToken;
private final String jsonTableRef;
@@ -2172,7 +2171,7 @@ public class BigQueryIO {
this.tempTablesView = tempTablesView;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));