You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/04 18:57:09 UTC
[1/2] beam git commit: Change BigQueryIO.Write return type to allow
for future new functionality
Repository: beam
Updated Branches:
refs/heads/master 50fc63a9b -> 65ffd6c64
Change BigQueryIO.Write return type to allow for future new functionality
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475bf135
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475bf135
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475bf135
Branch: refs/heads/master
Commit: 475bf13575fde12ce4de6e935d828123a0f317f4
Parents: 50fc63a
Author: Reuven Lax <re...@google.com>
Authored: Sun Apr 2 20:12:05 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 4 11:56:53 2017 -0700
----------------------------------------------------------------------
.../complete/game/utils/WriteToBigQuery.java | 3 +-
.../game/utils/WriteWindowedToBigQuery.java | 3 +-
.../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 7 ++-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 +--
.../sdk/io/gcp/bigquery/StreamWithDeDup.java | 14 ++----
.../beam/sdk/io/gcp/bigquery/WriteResult.java | 47 ++++++++++++++++++++
6 files changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index d60510f..5eecddb 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -119,12 +119,13 @@ public class WriteToBigQuery<InputT>
@Override
public PDone expand(PCollection<InputT> teamAndScore) {
- return teamAndScore
+ teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.writeTableRows().to(getTable(teamAndScore.getPipeline(), tableName))
.withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ return PDone.in(teamAndScore.getPipeline());
}
/** Utility to construct an output table reference. */
http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index b8e12c1..e602258 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -59,13 +59,14 @@ public class WriteWindowedToBigQuery<T>
@Override
public PDone expand(PCollection<T> teamAndScore) {
- return teamAndScore
+ teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.writeTableRows()
.to(getTable(teamAndScore.getPipeline(), tableName))
.withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ return PDone.in(teamAndScore.getPipeline());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
index 75b1cc7..c323858 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -53,7 +52,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
*/
-class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> {
+class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> {
BigQueryIO.Write<T> write;
BatchLoadBigQuery(BigQueryIO.Write<T> write) {
@@ -61,7 +60,7 @@ class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> {
}
@Override
- public PDone expand(PCollection<T> input) {
+ public WriteResult expand(PCollection<T> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
@@ -177,6 +176,6 @@ class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> {
write.getTableDescription()))
.withSideInputs(jobIdTokenView));
- return PDone.in(input.getPipeline());
+ return WriteResult.in(input.getPipeline());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index cc6ec09..3c7b549 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -674,7 +673,7 @@ public class BigQueryIO {
/** Implementation of {@link #write}. */
@AutoValue
- public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> {
@VisibleForTesting
// Maximum number of files in a single partition.
static final int MAX_NUM_FILES = 10000;
@@ -984,7 +983,7 @@ public class BigQueryIO {
}
@Override
- public PDone expand(PCollection<T> input) {
+ public WriteResult expand(PCollection<T> input) {
// When writing an Unbounded PCollection, or when a tablespec function is defined, we use
// StreamWithDeDup and BigQuery's streaming import API.
if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
index f667295..1fa26d1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
@@ -32,13 +32,12 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
/**
* PTransform that performs streaming BigQuery write. To increase consistency,
* it leverages BigQuery best effort de-dup mechanism.
*/
-class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
+class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> {
private final Write<T> write;
/** Constructor. */
@@ -52,7 +51,7 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
}
@Override
- public PDone expand(PCollection<T> input) {
+ public WriteResult expand(PCollection<T> input) {
// A naive implementation would be to simply stream data directly to BigQuery.
// However, this could occasionally lead to duplicated data, e.g., when
// a VM that runs this code is restarted and the code is re-run.
@@ -86,13 +85,6 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
write.getCreateDisposition(),
write.getTableDescription(),
write.getBigQueryServices())));
-
- // Note that the implementation to return PDone here breaks the
- // implicit assumption about the job execution order. If a user
- // implements a PTransform that takes PDone returned here as its
- // input, the transform may not necessarily be executed after
- // the BigQueryIO.Write.
-
- return PDone.in(input.getPipeline());
+ return WriteResult.in(input.getPipeline());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/475bf135/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
new file mode 100644
index 0000000..07fbc68
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.values.POutputValueBase;
+import org.apache.beam.sdk.values.TaggedPValue;
+
+
+/**
+ * The result of a {@link BigQueryIO.Write} transform.
+ */
+final class WriteResult extends POutputValueBase {
+ /**
+ * Creates a {@link WriteResult} in the given {@link Pipeline}.
+ */
+ static WriteResult in(Pipeline pipeline) {
+ return new WriteResult(pipeline);
+ }
+
+ @Override
+ public List<TaggedPValue> expand() {
+ return Collections.emptyList();
+ }
+
+ private WriteResult(Pipeline pipeline) {
+ super(pipeline);
+ }
+}
[2/2] beam git commit: This closes #2404
Posted by dh...@apache.org.
This closes #2404
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/65ffd6c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/65ffd6c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/65ffd6c6
Branch: refs/heads/master
Commit: 65ffd6c64bd1aed08dcb352962bab01b9bd9b3ca
Parents: 50fc63a 475bf13
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 4 11:56:59 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 4 11:56:59 2017 -0700
----------------------------------------------------------------------
.../complete/game/utils/WriteToBigQuery.java | 3 +-
.../game/utils/WriteWindowedToBigQuery.java | 3 +-
.../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 7 ++-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 +--
.../sdk/io/gcp/bigquery/StreamWithDeDup.java | 14 ++----
.../beam/sdk/io/gcp/bigquery/WriteResult.java | 47 ++++++++++++++++++++
6 files changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------