You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/04 18:57:09 UTC

[1/2] beam git commit: Change BigQueryIO.Write return type to allow for future new functionality

Repository: beam
Updated Branches:
  refs/heads/master 50fc63a9b -> 65ffd6c64


Change BigQueryIO.Write return type to allow for future new functionality


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475bf135
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475bf135
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475bf135

Branch: refs/heads/master
Commit: 475bf13575fde12ce4de6e935d828123a0f317f4
Parents: 50fc63a
Author: Reuven Lax <re...@google.com>
Authored: Sun Apr 2 20:12:05 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 4 11:56:53 2017 -0700

----------------------------------------------------------------------
 .../complete/game/utils/WriteToBigQuery.java    |  3 +-
 .../game/utils/WriteWindowedToBigQuery.java     |  3 +-
 .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java  |  7 ++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  5 +--
 .../sdk/io/gcp/bigquery/StreamWithDeDup.java    | 14 ++----
 .../beam/sdk/io/gcp/bigquery/WriteResult.java   | 47 ++++++++++++++++++++
 6 files changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index d60510f..5eecddb 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -119,12 +119,13 @@ public class WriteToBigQuery<InputT>
 
   @Override
   public PDone expand(PCollection<InputT> teamAndScore) {
-    return teamAndScore
+    teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.writeTableRows().to(getTable(teamAndScore.getPipeline(), tableName))
           .withSchema(getSchema())
           .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
           .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+    return PDone.in(teamAndScore.getPipeline());
   }
 
   /** Utility to construct an output table reference. */

http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index b8e12c1..e602258 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -59,13 +59,14 @@ public class WriteWindowedToBigQuery<T>
 
   @Override
   public PDone expand(PCollection<T> teamAndScore) {
-    return teamAndScore
+    teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
       .apply(BigQueryIO.writeTableRows()
                 .to(getTable(teamAndScore.getPipeline(), tableName))
                 .withSchema(getSchema())
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                 .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+    return PDone.in(teamAndScore.getPipeline());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
index 75b1cc7..c323858 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -53,7 +52,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
  */
-class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> {
+class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> {
   BigQueryIO.Write<T> write;
 
   BatchLoadBigQuery(BigQueryIO.Write<T> write) {
@@ -61,7 +60,7 @@ class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> {
   }
 
   @Override
-  public PDone expand(PCollection<T> input) {
+  public WriteResult expand(PCollection<T> input) {
     Pipeline p = input.getPipeline();
     BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
     ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
@@ -177,6 +176,6 @@ class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> {
             write.getTableDescription()))
             .withSideInputs(jobIdTokenView));
 
-    return PDone.in(input.getPipeline());
+    return WriteResult.in(input.getPipeline());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index cc6ec09..3c7b549 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -674,7 +673,7 @@ public class BigQueryIO {
 
   /** Implementation of {@link #write}. */
   @AutoValue
-  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+  public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> {
     @VisibleForTesting
     // Maximum number of files in a single partition.
     static final int MAX_NUM_FILES = 10000;
@@ -984,7 +983,7 @@ public class BigQueryIO {
     }
 
     @Override
-    public PDone expand(PCollection<T> input) {
+    public WriteResult expand(PCollection<T> input) {
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamWithDeDup and BigQuery's streaming import API.
       if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
index f667295..1fa26d1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
@@ -32,13 +32,12 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
 
 /**
 * PTransform that performs streaming BigQuery write. To increase consistency,
 * it leverages BigQuery best effort de-dup mechanism.
  */
-class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
+class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> {
   private final Write<T> write;
 
   /** Constructor. */
@@ -52,7 +51,7 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
   }
 
   @Override
-  public PDone expand(PCollection<T> input) {
+  public WriteResult expand(PCollection<T> input) {
     // A naive implementation would be to simply stream data directly to BigQuery.
     // However, this could occasionally lead to duplicated data, e.g., when
     // a VM that runs this code is restarted and the code is re-run.
@@ -86,13 +85,6 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
                     write.getCreateDisposition(),
                     write.getTableDescription(),
                     write.getBigQueryServices())));
-
-    // Note that the implementation to return PDone here breaks the
-    // implicit assumption about the job execution order. If a user
-    // implements a PTransform that takes PDone returned here as its
-    // input, the transform may not necessarily be executed after
-    // the BigQueryIO.Write.
-
-    return PDone.in(input.getPipeline());
+    return WriteResult.in(input.getPipeline());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
new file mode 100644
index 0000000..07fbc68
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.values.POutputValueBase;
+import org.apache.beam.sdk.values.TaggedPValue;
+
+
+/**
+ * The result of a {@link BigQueryIO.Write} transform.
+ */
+final class WriteResult extends POutputValueBase {
+  /**
+   * Creates a {@link WriteResult} in the given {@link Pipeline}.
+   */
+  static WriteResult in(Pipeline pipeline) {
+    return new WriteResult(pipeline);
+  }
+
+  @Override
+  public List<TaggedPValue> expand() {
+    return Collections.emptyList();
+  }
+
+  private WriteResult(Pipeline pipeline) {
+    super(pipeline);
+  }
+}


[2/2] beam git commit: This closes #2404

Posted by dh...@apache.org.
This closes #2404


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/65ffd6c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/65ffd6c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/65ffd6c6

Branch: refs/heads/master
Commit: 65ffd6c64bd1aed08dcb352962bab01b9bd9b3ca
Parents: 50fc63a 475bf13
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 4 11:56:59 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 4 11:56:59 2017 -0700

----------------------------------------------------------------------
 .../complete/game/utils/WriteToBigQuery.java    |  3 +-
 .../game/utils/WriteWindowedToBigQuery.java     |  3 +-
 .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java  |  7 ++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  5 +--
 .../sdk/io/gcp/bigquery/StreamWithDeDup.java    | 14 ++----
 .../beam/sdk/io/gcp/bigquery/WriteResult.java   | 47 ++++++++++++++++++++
 6 files changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------