You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/03/31 23:24:50 UTC
[beam] branch master updated: Merge pull request #7840: [BEAM-6602]
BigQueryIO.write natively understands Beam schemas
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 90bf972 Merge pull request #7840: [BEAM-6602] BigQueryIO.write natively understands Beam schemas
90bf972 is described below
commit 90bf97252500317fca287182dcb8a8e96f3d5878
Author: reuvenlax <re...@google.com>
AuthorDate: Sun Mar 31 16:24:33 2019 -0700
Merge pull request #7840: [BEAM-6602] BigQueryIO.write natively understands Beam schemas
---
...DefaultCoderCloudObjectTranslatorRegistrar.java | 2 -
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 113 +++++++++++---
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 138 ++++++++++--------
.../sdk/io/gcp/bigquery/DynamicDestinations.java | 6 +-
.../gcp/bigquery/DynamicDestinationsHelpers.java | 30 +++-
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 162 +++++++++++++++++----
.../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 20 +--
7 files changed, 340 insertions(+), 131 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index d43106b..25d6df9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
@@ -98,7 +97,6 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
KeyPrefixCoder.class,
RandomAccessDataCoder.class,
StringUtf8Coder.class,
- TableDestinationCoder.class,
TableDestinationCoderV2.class,
TableRowJsonCoder.class,
TextualIntegerCoder.class,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 98b114f..85cdba4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -201,7 +201,16 @@ import org.slf4j.LoggerFactory;
* BigQueryIO.Write#withFormatFunction(SerializableFunction)}.
*
* <pre>{@code
- * class Quote { Instant timestamp; String exchange; String symbol; double price; }
+ * class Quote {
+ * final Instant timestamp;
+ * final String exchange;
+ * final String symbol;
+ * final double price;
+ *
+ * Quote(Instant timestamp, String exchange, String symbol, double price) {
+ * // initialize all member variables.
+ * }
+ * }
*
* PCollection<Quote> quotes = ...
*
@@ -223,6 +232,34 @@ import org.slf4j.LoggerFactory;
* written to must already exist. Unbounded PCollections can only be written using {@link
* Write.WriteDisposition#WRITE_EMPTY} or {@link Write.WriteDisposition#WRITE_APPEND}.
*
+ * <p>BigQueryIO supports automatically inferring the BigQuery table schema from the Beam schema on
+ * the input PCollection. Beam can also automatically format the input into a TableRow in this case,
+ * if no format function is provide. In the above example, the quotes PCollection has a schema that
+ * Beam infers from the Quote POJO. So the write could be done more simply as follows:
+ *
+ * <pre>{@code
+ * {@literal @}DefaultSchema(JavaFieldSchema.class)
+ * class Quote {
+ * final Instant timestamp;
+ * final String exchange;
+ * final String symbol;
+ * final double price;
+ *
+ * {@literal @}SchemaCreate
+ * Quote(Instant timestamp, String exchange, String symbol, double price) {
+ * // initialize all member variables.
+ * }
+ * }
+ *
+ * PCollection<Quote> quotes = ...
+ *
+ * quotes.apply(BigQueryIO
+ * .<Quote>write()
+ * .to("my-project:my_dataset.my_table")
+ * .useBeamSchema()
+ * .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+ * }</pre>
+ *
* <h3>Loading historical data into time-partitioned BigQuery tables</h3>
*
* <p>To load historical data into a time-partitioned BigQuery table, specify {@link
@@ -1331,6 +1368,7 @@ public class BigQueryIO {
.setMaxFilesPerPartition(BatchLoads.DEFAULT_MAX_FILES_PER_PARTITION)
.setMaxBytesPerPartition(BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION)
.setOptimizeWrites(false)
+ .setUseBeamSchema(false)
.build();
}
@@ -1452,6 +1490,8 @@ public class BigQueryIO {
abstract Boolean getOptimizeWrites();
+ abstract Boolean getUseBeamSchema();
+
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -1511,6 +1551,8 @@ public class BigQueryIO {
abstract Builder<T> setOptimizeWrites(Boolean optimizeWrites);
+ abstract Builder<T> setUseBeamSchema(Boolean useBeamSchema);
+
abstract Write<T> build();
}
@@ -1618,7 +1660,6 @@ public class BigQueryIO {
/** Formats the user's type into a {@link TableRow} to be written to BigQuery. */
public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunction) {
- checkArgument(formatFunction != null, "formatFunction can not be null");
return toBuilder().setFormatFunction(formatFunction).build();
}
@@ -1826,10 +1867,20 @@ public class BigQueryIO {
* BigQuery. Not enabled by default in order to maintain backwards compatibility.
*/
@Experimental
- public Write<T> withOptimizedWrites() {
+ public Write<T> optimizedWrites() {
return toBuilder().setOptimizeWrites(true).build();
}
+ /**
+ * If true, then the BigQuery schema will be inferred from the input schema. If no
+ * formatFunction is set, then BigQueryIO will automatically turn the input records into
+ * TableRows that match the schema.
+ */
+ @Experimental
+ public Write<T> useBeamSchema() {
+ return toBuilder().setUseBeamSchema(true).build();
+ }
+
@VisibleForTesting
/** This method is for test usage only */
public Write<T> withTestServices(BigQueryServices testServices) {
@@ -1910,19 +1961,6 @@ public class BigQueryIO {
|| getDynamicDestinations() != null,
"must set the table reference of a BigQueryIO.Write transform");
- checkArgument(
- getFormatFunction() != null,
- "A function must be provided to convert type into a TableRow. "
- + "use BigQueryIO.Write.withFormatFunction to provide a formatting function.");
-
- // Require a schema if creating one or more tables.
- checkArgument(
- getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED
- || getJsonSchema() != null
- || getDynamicDestinations() != null
- || getSchemaFromView() != null,
- "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
-
List<?> allToArgs =
Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations());
checkArgument(
@@ -1995,8 +2033,9 @@ public class BigQueryIO {
// Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
if (getJsonTimePartitioning() != null) {
dynamicDestinations =
- new ConstantTimePartitioningDestinations(
- dynamicDestinations, getJsonTimePartitioning());
+ new ConstantTimePartitioningDestinations<>(
+ (DynamicDestinations<T, TableDestination>) dynamicDestinations,
+ getJsonTimePartitioning());
}
}
return expandTyped(input, dynamicDestinations);
@@ -2004,6 +2043,38 @@ public class BigQueryIO {
private <DestinationT> WriteResult expandTyped(
PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+ boolean optimizeWrites = getOptimizeWrites();
+ SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
+ if (getUseBeamSchema()) {
+ checkArgument(input.hasSchema());
+ optimizeWrites = true;
+ if (formatFunction == null) {
+ // If no format function set, then we will automatically convert the input type to a
+ // TableRow.
+ formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
+ }
+ // Infer the TableSchema from the input Beam schema.
+ TableSchema tableSchema = BigQueryUtils.toTableSchema(input.getSchema());
+ dynamicDestinations =
+ new ConstantSchemaDestinations<>(
+ dynamicDestinations,
+ StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
+ } else {
+ // Require a schema if creating one or more tables.
+ checkArgument(
+ getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED
+ || getJsonSchema() != null
+ || getDynamicDestinations() != null
+ || getSchemaFromView() != null,
+ "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+ }
+
+ checkArgument(
+ formatFunction != null,
+ "A function must be provided to convert type into a TableRow. "
+ + "use BigQueryIO.Write.withFormatFunction to provide a formatting function."
+ + "A format function is not required if Beam schemas are used.");
+
Coder<DestinationT> destinationCoder = null;
try {
destinationCoder =
@@ -2014,7 +2085,7 @@ public class BigQueryIO {
}
Method method = resolveMethod(input);
- if (getOptimizeWrites()) {
+ if (optimizeWrites) {
PCollection<KV<DestinationT, T>> rowsWithDestination =
input
.apply(
@@ -2026,12 +2097,12 @@ public class BigQueryIO {
input.getCoder(),
destinationCoder,
dynamicDestinations,
- getFormatFunction(),
+ formatFunction,
method);
} else {
PCollection<KV<DestinationT, TableRow>> rowsWithDestination =
input
- .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction()))
+ .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, formatFunction))
.setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of()));
return continueExpandTyped(
rowsWithDestination,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index ceab03a..6d1d15a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -26,41 +26,30 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
-/**
- * Utility methods for BigQuery related operations.
- *
- * <p><b>Example: Writing to BigQuery</b>
- *
- * <pre>{@code
- * PCollection<Row> rows = ...;
- *
- * rows.apply(BigQueryIO.<Row>write()
- * .withSchema(BigQueryUtils.toTableSchema(rows))
- * .withFormatFunction(BigQueryUtils.toTableRow())
- * .to("my-project:my_dataset.my_table"));
- * }</pre>
- */
+/** Utility methods for BigQuery related operations. */
public class BigQueryUtils {
private static final Map<TypeName, StandardSQLTypeName> BEAM_TO_BIGQUERY_TYPE_MAPPING =
ImmutableMap.<TypeName, StandardSQLTypeName>builder()
@@ -76,6 +65,7 @@ public class BigQueryUtils {
.put(TypeName.ROW, StandardSQLTypeName.STRUCT)
.put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP)
.put(TypeName.STRING, StandardSQLTypeName.STRING)
+ .put(TypeName.BYTES, StandardSQLTypeName.BYTES)
.build();
private static final Map<TypeName, Function<String, Object>> JSON_VALUE_PARSERS =
@@ -137,12 +127,18 @@ public class BigQueryUtils {
}
if (TypeName.ARRAY == type.getTypeName()) {
type = type.getCollectionElementType();
+ if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) {
+ throw new IllegalArgumentException("Array of collection is not supported in BigQuery.");
+ }
field.setMode(Mode.REPEATED.toString());
}
if (TypeName.ROW == type.getTypeName()) {
Schema subType = type.getRowSchema();
field.setFields(toTableFieldSchema(subType));
}
+ if (TypeName.MAP == type.getTypeName()) {
+ throw new IllegalArgumentException("Maps are not supported in BigQuery.");
+ }
field.setType(toStandardSQLTypeName(type).toString());
fields.add(field);
@@ -155,17 +151,18 @@ public class BigQueryUtils {
return new TableSchema().setFields(toTableFieldSchema(schema));
}
- /** Convert a Beam {@link PCollection} to a BigQuery {@link TableSchema}. */
- public static TableSchema toTableSchema(PCollection<Row> rows) {
- RowCoder coder = (RowCoder) rows.getCoder();
- return toTableSchema(coder.getSchema());
- }
-
- private static final SerializableFunction<Row, TableRow> TO_TABLE_ROW = new ToTableRow();
+ private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =
+ new ToTableRow(SerializableFunctions.identity());
/** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
public static SerializableFunction<Row, TableRow> toTableRow() {
- return TO_TABLE_ROW;
+ return ROW_TO_TABLE_ROW;
+ }
+
+ /** Convert a Beam schema type to a BigQuery {@link TableRow}. */
+ public static <T> SerializableFunction<T, TableRow> toTableRow(
+ SerializableFunction<T, Row> toRow) {
+ return new ToTableRow<>(toRow);
}
/** Convert {@link SchemaAndRecord} to a Beam {@link Row}. */
@@ -174,10 +171,16 @@ public class BigQueryUtils {
}
/** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
- private static class ToTableRow implements SerializableFunction<Row, TableRow> {
+ private static class ToTableRow<T> implements SerializableFunction<T, TableRow> {
+ private final SerializableFunction<T, Row> toRow;
+
+ ToTableRow(SerializableFunction<T, Row> toRow) {
+ this.toRow = toRow;
+ }
+
@Override
- public TableRow apply(Row input) {
- return toTableRow(input);
+ public TableRow apply(T input) {
+ return toTableRow(toRow.apply(input));
}
}
@@ -214,40 +217,61 @@ public class BigQueryUtils {
TableRow output = new TableRow();
for (int i = 0; i < row.getFieldCount(); i++) {
Object value = row.getValue(i);
-
Field schemaField = row.getSchema().getField(i);
- TypeName type = schemaField.getType().getTypeName();
-
- switch (type) {
- case ARRAY:
- type = schemaField.getType().getCollectionElementType().getTypeName();
- if (TypeName.ROW == type) {
- List<Row> rows = (List<Row>) value;
- List<TableRow> tableRows = new ArrayList<>(rows.size());
- for (int j = 0; j < rows.size(); j++) {
- tableRows.add(toTableRow(rows.get(j)));
- }
- value = tableRows;
- }
- break;
- case ROW:
- value = toTableRow((Row) value);
- break;
- case DATETIME:
- DateTimeFormatter patternFormat =
- new DateTimeFormatterBuilder()
- .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
- .toFormatter();
- value = value == null ? null : ((Instant) value).toDateTime().toString(patternFormat);
- break;
- default:
- value = row.getValue(i);
- break;
+ output = output.set(schemaField.getName(), fromBeamField(schemaField.getType(), value));
+ }
+ return output;
+ }
+
+ private static Object fromBeamField(FieldType fieldType, Object fieldValue) {
+ if (fieldValue == null) {
+ if (!fieldType.getNullable()) {
+ throw new IllegalArgumentException("Field is not nullable.");
}
+ return null;
+ }
- output = output.set(schemaField.getName(), value);
+ switch (fieldType.getTypeName()) {
+ case ARRAY:
+ FieldType elementType = fieldType.getCollectionElementType();
+ List items = (List) fieldValue;
+ List convertedItems = Lists.newArrayListWithCapacity(items.size());
+ for (Object item : items) {
+ convertedItems.add(fromBeamField(elementType, item));
+ }
+ return convertedItems;
+
+ case ROW:
+ return toTableRow((Row) fieldValue);
+
+ case DATETIME:
+ DateTimeFormatter patternFormat =
+ new DateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
+ .toFormatter();
+ return ((Instant) fieldValue).toDateTime().toString(patternFormat);
+
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ case BOOLEAN:
+ return fieldValue.toString();
+
+ case DECIMAL:
+ return fieldValue.toString();
+
+ case BYTES:
+ ByteBuffer byteBuffer = (ByteBuffer) fieldValue;
+ byte[] bytes = new byte[byteBuffer.limit()];
+ byteBuffer.get(bytes);
+ return BaseEncoding.base64().encode(bytes);
+
+ default:
+ return fieldValue;
}
- return output;
}
/**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index c1567cc..ea3d435 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -112,11 +112,7 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
return sideInputAccessor.sideInput(view);
}
- final void setSideInputAccessor(SideInputAccessor sideInputAccessor) {
- this.sideInputAccessor = sideInputAccessor;
- }
-
- final void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
+ void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
this.sideInputAccessor = new SideInputAccessorViaProcessContext(context);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 99c3f6f..d006220 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -23,10 +23,13 @@ import com.google.api.services.bigquery.model.TableSchema;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableSpec;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -75,7 +78,7 @@ class DynamicDestinationsHelpers {
@Override
public Coder<TableDestination> getDestinationCoder() {
- return TableDestinationCoder.of();
+ return TableDestinationCoderV2.of();
}
}
@@ -149,25 +152,42 @@ class DynamicDestinationsHelpers {
}
@Override
+ Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
+ throws CannotProvideCoderException {
+ return inner.getDestinationCoderWithDefault(registry);
+ }
+
+ @Override
+ public List<PCollectionView<?>> getSideInputs() {
+ return inner.getSideInputs();
+ }
+
+ @Override
+ void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
+ super.setSideInputAccessorFromProcessContext(context);
+ inner.setSideInputAccessorFromProcessContext(context);
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(this).add("inner", inner).toString();
}
}
/** Returns the same schema for every table. */
- static class ConstantSchemaDestinations<T>
- extends DelegatingDynamicDestinations<T, TableDestination> {
+ static class ConstantSchemaDestinations<T, DestinationT>
+ extends DelegatingDynamicDestinations<T, DestinationT> {
@Nullable private final ValueProvider<String> jsonSchema;
ConstantSchemaDestinations(
- DynamicDestinations<T, TableDestination> inner, ValueProvider<String> jsonSchema) {
+ DynamicDestinations<T, DestinationT> inner, ValueProvider<String> jsonSchema) {
super(inner);
checkArgument(jsonSchema != null, "jsonSchema can not be null");
this.jsonSchema = jsonSchema;
}
@Override
- public TableSchema getSchema(TableDestination destination) {
+ public TableSchema getSchema(DestinationT destination) {
String jsonSchema = this.jsonSchema.get();
checkArgument(jsonSchema != null, "jsonSchema can not be null");
return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index df18467..a2ea13b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -61,8 +61,14 @@ import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -84,6 +90,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -166,7 +173,7 @@ public class BigQueryIOWriteTest implements Serializable {
}
// Create an intermediate type to ensure that coder inference up the inheritance tree is tested.
- abstract static class StringIntegerDestinations extends DynamicDestinations<String, Integer> {}
+ abstract static class StringLongDestinations extends DynamicDestinations<String, Long> {}
@Test
public void testWriteEmptyPCollection() throws Exception {
@@ -193,15 +200,28 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteDynamicDestinationsBatch() throws Exception {
- writeDynamicDestinations(false);
+ writeDynamicDestinations(false, false);
+ }
+
+ @Test
+ public void testWriteDynamicDestinationsBatchWithSchemas() throws Exception {
+ writeDynamicDestinations(false, true);
}
@Test
public void testWriteDynamicDestinationsStreaming() throws Exception {
- writeDynamicDestinations(true);
+ writeDynamicDestinations(true, false);
}
- public void writeDynamicDestinations(boolean streaming) throws Exception {
+ @Test
+ public void testWriteDynamicDestinationsStreamingWithSchemas() throws Exception {
+ writeDynamicDestinations(true, true);
+ }
+
+ public void writeDynamicDestinations(boolean streaming, boolean schemas) throws Exception {
+ final Schema schema =
+ Schema.builder().addField("name", FieldType.STRING).addField("id", FieldType.INT32).build();
+
final Pattern userPattern = Pattern.compile("([a-z]+)([0-9]+)");
final PCollectionView<List<String>> sideInput1 =
@@ -231,43 +251,45 @@ public class BigQueryIOWriteTest implements Serializable {
users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
}
+ if (schemas) {
+ users =
+ users.setSchema(
+ schema,
+ user -> {
+ Matcher matcher = userPattern.matcher(user);
+ checkState(matcher.matches());
+ return Row.withSchema(schema)
+ .addValue(matcher.group(1))
+ .addValue(Integer.valueOf(matcher.group(2)))
+ .build();
+ },
+ r -> r.getString(0) + r.getInt32(1));
+ }
+
// Use a partition decorator to verify that partition decorators are supported.
final String partitionDecorator = "20171127";
- users.apply(
- "WriteBigQuery",
+ BigQueryIO.Write<String> write =
BigQueryIO.<String>write()
.withTestServices(fakeBqServices)
.withMaxFilesPerBundle(5)
.withMaxFileSize(10)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withFormatFunction(
- user -> {
- Matcher matcher = userPattern.matcher(user);
- if (matcher.matches()) {
- return new TableRow()
- .set("name", matcher.group(1))
- .set("id", Integer.valueOf(matcher.group(2)));
- }
- throw new RuntimeException("Unmatching element " + user);
- })
.to(
- new StringIntegerDestinations() {
+ new StringLongDestinations() {
@Override
- public Integer getDestination(ValueInSingleWindow<String> element) {
+ public Long getDestination(ValueInSingleWindow<String> element) {
assertThat(
element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class));
Matcher matcher = userPattern.matcher(element.getValue());
- if (matcher.matches()) {
- // Since we name tables by userid, we can simply store an Integer to represent
- // a table.
- return Integer.valueOf(matcher.group(2));
- }
- throw new RuntimeException("Unmatching destination " + element.getValue());
+ checkState(matcher.matches());
+ // Since we name tables by userid, we can simply store a Long to represent
+ // a table.
+ return Long.valueOf(matcher.group(2));
}
@Override
- public TableDestination getTable(Integer userId) {
+ public TableDestination getTable(Long userId) {
verifySideInputs();
// Each user in it's own table.
return new TableDestination(
@@ -276,7 +298,7 @@ public class BigQueryIOWriteTest implements Serializable {
}
@Override
- public TableSchema getSchema(Integer userId) {
+ public TableSchema getSchema(Long userId) {
verifySideInputs();
return new TableSchema()
.setFields(
@@ -299,21 +321,33 @@ public class BigQueryIOWriteTest implements Serializable {
allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c")));
}
})
- .withoutValidation());
+ .withoutValidation();
+ if (schemas) {
+ write = write.useBeamSchema();
+ } else {
+ write =
+ write.withFormatFunction(
+ user -> {
+ Matcher matcher = userPattern.matcher(user);
+ checkState(matcher.matches());
+ return new TableRow().set("name", matcher.group(1)).set("id", matcher.group(2));
+ });
+ }
+ users.apply("WriteBigQuery", write);
p.run();
- Map<Integer, List<TableRow>> expectedTableRows = Maps.newHashMap();
+ Map<Long, List<TableRow>> expectedTableRows = Maps.newHashMap();
for (String anUserList : userList) {
Matcher matcher = userPattern.matcher(anUserList);
checkState(matcher.matches());
String nickname = matcher.group(1);
- int userid = Integer.valueOf(matcher.group(2));
+ Long userid = Long.valueOf(matcher.group(2));
List<TableRow> expected =
expectedTableRows.computeIfAbsent(userid, k -> Lists.newArrayList());
- expected.add(new TableRow().set("name", nickname).set("id", userid));
+ expected.add(new TableRow().set("name", nickname).set("id", userid.toString()));
}
- for (Map.Entry<Integer, List<TableRow>> entry : expectedTableRows.entrySet()) {
+ for (Map.Entry<Long, List<TableRow>> entry : expectedTableRows.entrySet()) {
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()),
containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class)));
@@ -577,6 +611,72 @@ public class BigQueryIOWriteTest implements Serializable {
new TableRow().set("name", "d").set("number", 4)));
}
+ @DefaultSchema(JavaFieldSchema.class)
+ static class SchemaPojo {
+ final String name;
+ final int number;
+
+ @SchemaCreate
+ SchemaPojo(String name, int number) {
+ this.name = name;
+ this.number = number;
+ }
+ }
+
+ @Test
+ public void testSchemaWriteLoads() throws Exception {
+ p.apply(
+ Create.of(
+ new SchemaPojo("a", 1),
+ new SchemaPojo("b", 2),
+ new SchemaPojo("c", 3),
+ new SchemaPojo("d", 4)))
+ .apply(
+ BigQueryIO.<SchemaPojo>write()
+ .to("project-id:dataset-id.table-id")
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(Method.FILE_LOADS)
+ .useBeamSchema()
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+ p.run();
+
+ assertThat(
+ fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ containsInAnyOrder(
+ new TableRow().set("name", "a").set("number", "1"),
+ new TableRow().set("name", "b").set("number", "2"),
+ new TableRow().set("name", "c").set("number", "3"),
+ new TableRow().set("name", "d").set("number", "4")));
+ }
+
+ @Test
+ public void testSchemaWriteStreams() throws Exception {
+ p.apply(
+ Create.of(
+ new SchemaPojo("a", 1),
+ new SchemaPojo("b", 2),
+ new SchemaPojo("c", 3),
+ new SchemaPojo("d", 4)))
+ .apply(
+ BigQueryIO.<SchemaPojo>write()
+ .to("project-id:dataset-id.table-id")
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(Method.STREAMING_INSERTS)
+ .useBeamSchema()
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+ p.run();
+
+ assertThat(
+ fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ containsInAnyOrder(
+ new TableRow().set("name", "a").set("number", "1"),
+ new TableRow().set("name", "b").set("number", "2"),
+ new TableRow().set("name", "c").set("number", "3"),
+ new TableRow().set("name", "d").set("number", "4")));
+ }
+
/**
* A generic window function that allows partitioning data into windows by a string value.
*
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 2f599d7..3e8a880 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -136,17 +136,17 @@ public class BigQueryUtilsTest {
TableRow row = toTableRow().apply(FLAT_ROW);
assertThat(row.size(), equalTo(5));
- assertThat(row, hasEntry("id", 123L));
- assertThat(row, hasEntry("value", 123.456));
+ assertThat(row, hasEntry("id", "123"));
+ assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
- assertThat(row, hasEntry("valid", false));
+ assertThat(row, hasEntry("valid", "false"));
}
@Test
public void testToTableRow_array() {
TableRow row = toTableRow().apply(ARRAY_ROW);
- assertThat(row, hasEntry("ids", Arrays.asList(123L, 124L)));
+ assertThat(row, hasEntry("ids", Arrays.asList("123", "124")));
assertThat(row.size(), equalTo(1));
}
@@ -157,10 +157,10 @@ public class BigQueryUtilsTest {
assertThat(row.size(), equalTo(1));
row = (TableRow) row.get("row");
assertThat(row.size(), equalTo(5));
- assertThat(row, hasEntry("id", 123L));
- assertThat(row, hasEntry("value", 123.456));
+ assertThat(row, hasEntry("id", "123"));
+ assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
- assertThat(row, hasEntry("valid", false));
+ assertThat(row, hasEntry("valid", "false"));
}
@Test
@@ -170,10 +170,10 @@ public class BigQueryUtilsTest {
assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("rows")).get(0);
assertThat(row.size(), equalTo(5));
- assertThat(row, hasEntry("id", 123L));
- assertThat(row, hasEntry("value", 123.456));
+ assertThat(row, hasEntry("id", "123"));
+ assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
- assertThat(row, hasEntry("valid", false));
+ assertThat(row, hasEntry("valid", "false"));
}
@Test