You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/08 23:17:37 UTC
[1/2] beam git commit: This closes #2953
Repository: beam
Updated Branches:
refs/heads/release-2.0.0 7dfc45563 -> 8f1834432
This closes #2953
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40bdbcb2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40bdbcb2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40bdbcb2
Branch: refs/heads/release-2.0.0
Commit: 40bdbcb28d5507c601d0791059c3f6ac9581a8f8
Parents: 7dfc455
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon May 8 13:41:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 8 16:00:33 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 7 +++----
.../bigquery/DynamicDestinationsHelpers.java | 3 ++-
.../sdk/io/gcp/bigquery/TableDestination.java | 13 +++++++++---
.../io/gcp/bigquery/TableDestinationCoder.java | 19 +++++++++---------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 21 ++++++++++----------
5 files changed, 36 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 304864a..8fb05ff 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -641,7 +641,6 @@ public class BigQueryIO {
public static <T> Write<T> write() {
return new AutoValue_BigQueryIO_Write.Builder<T>()
.setValidate(true)
- .setTableDescription("")
.setBigQueryServices(new BigQueryServicesImpl())
.setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
@@ -684,7 +683,7 @@ public class BigQueryIO {
abstract CreateDisposition getCreateDisposition();
abstract WriteDisposition getWriteDisposition();
/** Table description. Default is empty. */
- abstract String getTableDescription();
+ @Nullable abstract String getTableDescription();
/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();
abstract BigQueryServices getBigQueryServices();
@@ -1027,8 +1026,8 @@ public class BigQueryIO {
.withLabel("Table WriteDisposition"))
.addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"), true)
- .addIfNotDefault(DisplayData.item("tableDescription", getTableDescription())
- .withLabel("Table Description"), "");
+ .addIfNotNull(DisplayData.item("tableDescription", getTableDescription())
+ .withLabel("Table Description"));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 72a3314..530e2b6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -40,9 +40,10 @@ class DynamicDestinationsHelpers {
*/
static class ConstantTableDestinations<T> extends DynamicDestinations<T, TableDestination> {
private final ValueProvider<String> tableSpec;
+ @Nullable
private final String tableDescription;
- ConstantTableDestinations(ValueProvider<String> tableSpec, String tableDescription) {
+ ConstantTableDestinations(ValueProvider<String> tableSpec, @Nullable String tableDescription) {
this.tableSpec = tableSpec;
this.tableDescription = tableDescription;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 7a82c54..ecf35d8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableReference;
import java.io.Serializable;
import java.util.Objects;
+import javax.annotation.Nullable;
/**
* Encapsulates a BigQuery table destination.
@@ -28,15 +29,16 @@ import java.util.Objects;
public class TableDestination implements Serializable {
private static final long serialVersionUID = 1L;
private final String tableSpec;
+ @Nullable
private final String tableDescription;
- public TableDestination(String tableSpec, String tableDescription) {
+ public TableDestination(String tableSpec, @Nullable String tableDescription) {
this.tableSpec = tableSpec;
this.tableDescription = tableDescription;
}
- public TableDestination(TableReference tableReference, String tableDescription) {
+ public TableDestination(TableReference tableReference, @Nullable String tableDescription) {
this.tableSpec = BigQueryHelpers.toTableSpec(tableReference);
this.tableDescription = tableDescription;
}
@@ -49,13 +51,18 @@ public class TableDestination implements Serializable {
return BigQueryHelpers.parseTableSpec(tableSpec);
}
+ @Nullable
public String getTableDescription() {
return tableDescription;
}
@Override
public String toString() {
- return "tableSpec: " + tableSpec + " tableDescription: " + tableDescription;
+ String toString = "tableSpec: " + tableSpec;
+ if (tableDescription != null) {
+ toString += " tableDescription: " + tableDescription;
+ }
+ return toString;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 3059e2a..01bc558 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -22,13 +22,16 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
/** A coder for {@link TableDestination} objects. */
public class TableDestinationCoder extends AtomicCoder<TableDestination> {
private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
- private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+ private static final Coder<String> tableSpecCoder = StringUtf8Coder.of();
+ private static final Coder<String> tableDescriptionCoder = NullableCoder.of(StringUtf8Coder.of());
public static TableDestinationCoder of() {
return INSTANCE;
@@ -40,19 +43,17 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
- stringCoder.encode(value.getTableSpec(), outStream, context.nested());
- stringCoder.encode(value.getTableDescription(), outStream, context.nested());
+ tableSpecCoder.encode(value.getTableSpec(), outStream, context.nested());
+ tableDescriptionCoder.encode(value.getTableDescription(), outStream, context);
}
@Override
public TableDestination decode(InputStream inStream, Context context) throws IOException {
- return new TableDestination(
- stringCoder.decode(inStream, context.nested()),
- stringCoder.decode(inStream, context.nested()));
+ String tableSpec = tableSpecCoder.decode(inStream, context.nested());
+ String tableDescription = tableDescriptionCoder.decode(inStream, context);
+ return new TableDestination(tableSpec, tableDescription);
}
@Override
- public void verifyDeterministic() throws NonDeterministicException {
- return;
- }
+ public void verifyDeterministic() throws NonDeterministicException {}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/40bdbcb2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 6c15f87..0d3f000 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -595,6 +595,7 @@ public class BigQueryIOTest implements Serializable {
new TableRow().set("name", "c").set("number", 3))
.withCoder(TableRowJsonCoder.of()))
.apply(BigQueryIO.writeTableRows().to("dataset-id.table-id")
+ .withTableDescription(null)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema().setFields(
ImmutableList.of(
@@ -981,7 +982,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.writeTableRows().to("foo.com:project:somedataset.sometable");
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
@@ -999,7 +1000,7 @@ public class BigQueryIOTest implements Serializable {
null,
CreateDisposition.CREATE_IF_NEEDED,
WriteDisposition.WRITE_EMPTY,
- "",
+ null,
false);
}
@@ -1010,7 +1011,7 @@ public class BigQueryIOTest implements Serializable {
checkWriteObject(
write, null, "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY,
- "");
+ null);
}
@Test
@@ -1022,7 +1023,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows().to(table);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
@@ -1032,7 +1033,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.<TableRow>write().to("foo.com:project:somedataset.sometable").withSchema(schema);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
+ schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
@@ -1042,7 +1043,7 @@ public class BigQueryIOTest implements Serializable {
.withCreateDisposition(CreateDisposition.CREATE_NEVER);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, "");
+ null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null);
}
@Test
@@ -1052,7 +1053,7 @@ public class BigQueryIOTest implements Serializable {
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
@@ -1062,7 +1063,7 @@ public class BigQueryIOTest implements Serializable {
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, "");
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null);
}
@Test
@@ -1072,7 +1073,7 @@ public class BigQueryIOTest implements Serializable {
.withWriteDisposition(WriteDisposition.WRITE_APPEND);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, "");
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null);
}
@Test
@@ -1082,7 +1083,7 @@ public class BigQueryIOTest implements Serializable {
.withWriteDisposition(WriteDisposition.WRITE_EMPTY);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
- null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
+ null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
[2/2] beam git commit: This closes #2970: Cherry pick #2953 to
release-2.0.0
Posted by jk...@apache.org.
This closes #2970: Cherry pick #2953 to release-2.0.0
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8f183443
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8f183443
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8f183443
Branch: refs/heads/release-2.0.0
Commit: 8f18344329d3af5882886a50a841d01062d408ef
Parents: 7dfc455 40bdbcb
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon May 8 16:06:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 8 16:06:15 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 7 +++----
.../bigquery/DynamicDestinationsHelpers.java | 3 ++-
.../sdk/io/gcp/bigquery/TableDestination.java | 13 +++++++++---
.../io/gcp/bigquery/TableDestinationCoder.java | 19 +++++++++---------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 21 ++++++++++----------
5 files changed, 36 insertions(+), 27 deletions(-)
----------------------------------------------------------------------