You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/18 21:35:29 UTC
[2/2] incubator-beam git commit: [BEAM-50] BigQueryIO: move write
validation to validate() from apply()
[BEAM-50] BigQueryIO: move write validation to validate() from apply()
If runners override the transform, validation will still happen.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3fe10227
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3fe10227
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3fe10227
Branch: refs/heads/master
Commit: 3fe10227935e1b2a5d8af82338cb8d635f1b564e
Parents: becc1ef
Author: Dan Halperin <dh...@google.com>
Authored: Sat Apr 16 12:19:01 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 18 12:34:23 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 52 ++++++++++++--------
1 file changed, 31 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3fe10227/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index eb99637..aff1e4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -441,9 +441,6 @@ public class BigQueryIO {
return new Bound(name, query, table, validate, false);
}
- /**
- * Validates the current {@link PTransform}.
- */
@Override
public void validate(PInput input) {
if (table == null && query == null) {
@@ -931,7 +928,7 @@ public class BigQueryIO {
}
@Override
- public PDone apply(PCollection<TableRow> input) {
+ public void validate(PCollection<TableRow> input) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
TableReference table = getTable();
@@ -963,7 +960,7 @@ public class BigQueryIO {
// Note that a presence check can fail if the table or dataset are created by earlier stages
// of the pipeline. For these cases the withoutValidation method can be used to disable
// the check.
- // Unfortunately we can't validate anything early in case tableRefFunction is specified.
+ // Unfortunately we can't validate anything early if tableRefFunction is specified.
if (table != null && validate) {
verifyDatasetPresence(options, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
@@ -974,9 +971,8 @@ public class BigQueryIO {
}
}
- // In streaming, BigQuery write is taken care of by StreamWithDeDup transform.
- // We also currently do this if a tablespec function is specified.
if (options.isStreaming() || tableRefFunction != null) {
+ // We will use BigQuery's streaming write API -- validate support dispositions.
if (createDisposition == CreateDisposition.CREATE_NEVER) {
throw new IllegalArgumentException("CreateDispostion.CREATE_NEVER is not "
+ "supported for unbounded PCollections or when using tablespec functions.");
@@ -986,24 +982,38 @@ public class BigQueryIO {
throw new IllegalArgumentException("WriteDisposition.WRITE_TRUNCATE is not "
+ "supported for unbounded PCollections or when using tablespec functions.");
}
-
- return input.apply(new StreamWithDeDup(table, tableRefFunction, getSchema()));
+ } else {
+ // We will use a BigQuery load job -- validate the temp location.
+ String tempLocation = options.getTempLocation();
+ checkArgument(
+ !Strings.isNullOrEmpty(tempLocation),
+ "BigQueryIO.Write needs a GCS temp location to store temp files.");
+ if (testBigQueryServices == null) {
+ try {
+ GcsPath.fromUri(tempLocation);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+ tempLocation),
+ e);
+ }
+ }
}
+ }
- String tempLocation = options.getTempLocation();
- checkArgument(!Strings.isNullOrEmpty(tempLocation),
- "BigQueryIO.Write needs a GCS temp location to store temp files.");
- if (testBigQueryServices == null) {
- try {
- GcsPath.fromUri(tempLocation);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
- tempLocation), e);
- }
+ @Override
+ public PDone apply(PCollection<TableRow> input) {
+ BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+ // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
+ // and BigQuery's streaming import API.
+ if (options.isStreaming() || tableRefFunction != null) {
+ return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
}
+
String jobIdToken = UUID.randomUUID().toString();
- String tempFilePrefix = tempLocation + "/BigQuerySinkTemp/" + jobIdToken;
+ String tempFilePrefix = options.getTempLocation() + "/BigQuerySinkTemp/" + jobIdToken;
BigQueryServices bqServices = getBigQueryServices();
return input.apply("Write", org.apache.beam.sdk.io.Write.to(
new BigQuerySink(