You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/19 03:50:59 UTC
[1/2] incubator-beam git commit: [BEAM-50] BigQueryIO: fix
autocompleting project and test it
Repository: incubator-beam
Updated Branches:
refs/heads/master 5f20e5361 -> bf78e9667
[BEAM-50] BigQueryIO: fix autocompleting project and test it
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3c876c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3c876c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3c876c2
Branch: refs/heads/master
Commit: e3c876c2616fe8fc5c549973e8444a6c55f12e77
Parents: 5f20e53
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 18 16:10:54 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 18 17:44:04 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 91 +++++++++++---------
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 40 ++++++++-
2 files changed, 89 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c876c2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index aff1e4c..d9debbd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -526,7 +527,7 @@ public class BigQueryIO {
}
/**
- * Returns the table to write, or {@code null} if reading from a query instead.
+ * Returns the table to read, or {@code null} if reading from a query instead.
*/
public TableReference getTable() {
return table;
@@ -931,37 +932,34 @@ public class BigQueryIO {
public void validate(PCollection<TableRow> input) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
- TableReference table = getTable();
- if (table == null && tableRefFunction == null) {
- throw new IllegalStateException(
- "must set the table reference of a BigQueryIO.Write transform");
- }
- if (table != null && tableRefFunction != null) {
- throw new IllegalStateException(
- "Cannot set both a table reference and a table function for a BigQueryIO.Write "
- + "transform");
- }
+ // Exactly one of the table and table reference can be configured.
+ checkState(
+ jsonTableRef != null || tableRefFunction != null,
+ "must set the table reference of a BigQueryIO.Write transform");
+ checkState(
+ jsonTableRef == null || tableRefFunction == null,
+ "Cannot set both a table reference and a table function for a BigQueryIO.Write"
+ + " transform");
- if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && jsonSchema == null) {
- throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, "
- + "however no schema was provided.");
- }
+ // Require a schema if creating one or more tables.
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+ "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+
+ // The user specified a table.
+ if (jsonTableRef != null && validate) {
+ TableReference table = getTable();
- if (table != null && table.getProjectId() == null) {
// If user does not specify a project we assume the table to be located in the project
- // that owns the Dataflow job.
- String projectIdFromOptions = options.getProject();
- LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
- table.getTableId(), projectIdFromOptions));
- table.setProjectId(projectIdFromOptions);
- }
+ // configured in BigQueryOptions.
+ if (Strings.isNullOrEmpty(table.getProjectId())) {
+ table.setProjectId(options.getProject());
+ }
- // Check for destination table presence and emptiness for early failure notification.
- // Note that a presence check can fail if the table or dataset are created by earlier stages
- // of the pipeline. For these cases the withoutValidation method can be used to disable
- // the check.
- // Unfortunately we can't validate anything early if tableRefFunction is specified.
- if (table != null && validate) {
+ // Check for destination table presence and emptiness for early failure notification.
+ // Note that a presence check can fail when the table or dataset is created by an earlier
+ // stage of the pipeline. For these cases the #withoutValidation method can be used to
+ // disable the check.
verifyDatasetPresence(options, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
verifyTablePresence(options, table);
@@ -972,16 +970,16 @@ public class BigQueryIO {
}
if (options.isStreaming() || tableRefFunction != null) {
- // We will use BigQuery's streaming write API -- validate support dispositions.
- if (createDisposition == CreateDisposition.CREATE_NEVER) {
- throw new IllegalArgumentException("CreateDispostion.CREATE_NEVER is not "
- + "supported for unbounded PCollections or when using tablespec functions.");
- }
+ // We will use BigQuery's streaming write API -- validate supported dispositions.
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_NEVER,
+ "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when"
+ + " using a tablespec function.");
- if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
- throw new IllegalArgumentException("WriteDisposition.WRITE_TRUNCATE is not "
- + "supported for unbounded PCollections or when using tablespec functions.");
- }
+ checkArgument(
+ writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+ "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+ + " when using a tablespec function.");
} else {
// We will use a BigQuery load job -- validate the temp location.
String tempLocation = options.getTempLocation();
@@ -1012,13 +1010,17 @@ public class BigQueryIO {
return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
}
+ TableReference table = fromJsonString(jsonTableRef, TableReference.class);
+ if (Strings.isNullOrEmpty(table.getProjectId())) {
+ table.setProjectId(options.getProject());
+ }
String jobIdToken = UUID.randomUUID().toString();
String tempFilePrefix = options.getTempLocation() + "/BigQuerySinkTemp/" + jobIdToken;
BigQueryServices bqServices = getBigQueryServices();
return input.apply("Write", org.apache.beam.sdk.io.Write.to(
new BigQuerySink(
jobIdToken,
- jsonTableRef,
+ table,
jsonSchema,
getWriteDisposition(),
getCreateDisposition(),
@@ -1047,7 +1049,8 @@ public class BigQueryIO {
return fromJsonString(jsonSchema, TableSchema.class);
}
- /** Returns the table reference, or {@code null} if a . */
+ /** Returns the table reference, or {@code null}. */
+ @Nullable
public TableReference getTable() {
return fromJsonString(jsonTableRef, TableReference.class);
}
@@ -1086,7 +1089,7 @@ public class BigQueryIO {
public BigQuerySink(
String jobIdToken,
- @Nullable String jsonTable,
+ @Nullable TableReference table,
@Nullable String jsonSchema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
@@ -1095,7 +1098,13 @@ public class BigQueryIO {
BigQueryServices bqServices) {
super(tempFile, ".json");
this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
- this.jsonTable = jsonTable;
+ if (table == null) {
+ this.jsonTable = null;
+ } else {
+ checkArgument(!Strings.isNullOrEmpty(table.getProjectId()),
+ "Table %s should have a project specified", table);
+ this.jsonTable = toJsonString(table);
+ }
this.jsonSchema = jsonSchema;
this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
this.createDisposition = checkNotNull(createDisposition, "createDisposition");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c876c2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index b9af1e2..e1f8e4d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -31,6 +31,8 @@ import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BigQueryServices;
import org.apache.beam.sdk.util.BigQueryServices.Status;
import org.apache.beam.sdk.util.CoderUtils;
@@ -339,7 +341,7 @@ public class BigQueryIOTest {
new TableRow().set("name", "b").set("number", 2),
new TableRow().set("name", "c").set("number", 3)))
.setCoder(TableRowJsonCoder.of())
- .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+ .apply(BigQueryIO.Write.to("dataset-id.table-id")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema().setFields(
ImmutableList.of(
@@ -604,4 +606,40 @@ public class BigQueryIOTest {
assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName());
assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName());
}
+
+ @Test
+ public void testWriteValidateFailsCreateNoSchema() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("no schema was provided");
+ TestPipeline.create()
+ .apply(Create.<TableRow>of())
+ .apply(BigQueryIO.Write
+ .to("dataset.table")
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
+ }
+
+ @Test
+ public void testWriteValidateFailsTableAndTableSpec() {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Cannot set both a table reference and a table function");
+ TestPipeline.create()
+ .apply(Create.<TableRow>of())
+ .apply(BigQueryIO.Write
+ .to("dataset.table")
+ .to(new SerializableFunction<BoundedWindow, String>() {
+ @Override
+ public String apply(BoundedWindow input) {
+ return null;
+ }
+ }));
+ }
+
+ @Test
+ public void testWriteValidateFailsNoTableAndNoTableSpec() {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
+ TestPipeline.create()
+ .apply(Create.<TableRow>of())
+ .apply(BigQueryIO.Write.named("name"));
+ }
}
[2/2] incubator-beam git commit: Closes #205
Posted by dh...@apache.org.
Closes #205
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf78e966
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf78e966
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf78e966
Branch: refs/heads/master
Commit: bf78e966716c2630573c8ea135af9807053253b5
Parents: 5f20e53 e3c876c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 18 18:50:39 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 18 18:50:39 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 91 +++++++++++---------
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 40 ++++++++-
2 files changed, 89 insertions(+), 42 deletions(-)
----------------------------------------------------------------------