You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/04/30 20:18:58 UTC
[beam] branch master updated: [BEAM-9795] Support custom avro
DatumWriters when writing to BigQuery
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ce75c4f [BEAM-9795] Support custom avro DatumWriters when writing to BigQuery
new 144b96d Merge pull request #11479 from [BEAM-9795] Support custom avro DatumWriters when writing to BigQuery
ce75c4f is described below
commit ce75c4f5bee2424dd8c24da90e9913d3daeb7f16
Author: steve <sn...@twitter.com>
AuthorDate: Tue Apr 21 11:10:59 2020 -0400
[BEAM-9795] Support custom avro DatumWriters when writing to BigQuery
---
.../beam/sdk/io/gcp/bigquery/AvroRowWriter.java | 15 ++---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 75 ++++++++++++++++------
.../beam/sdk/io/gcp/bigquery/RowWriterFactory.java | 29 ++++++---
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 65 ++++++++++++++++++-
4 files changed, 145 insertions(+), 39 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java
index a0509a6..74a0bb4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java
@@ -20,28 +20,27 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.MimeTypes;
-class AvroRowWriter<T> extends BigQueryRowWriter<T> {
- private final DataFileWriter<GenericRecord> writer;
+class AvroRowWriter<AvroT, T> extends BigQueryRowWriter<T> {
+ private final DataFileWriter<AvroT> writer;
private final Schema schema;
- private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toAvroRecord;
+ private final SerializableFunction<AvroWriteRequest<T>, AvroT> toAvroRecord;
AvroRowWriter(
String basename,
Schema schema,
- SerializableFunction<AvroWriteRequest<T>, GenericRecord> toAvroRecord)
+ SerializableFunction<AvroWriteRequest<T>, AvroT> toAvroRecord,
+ SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory)
throws Exception {
super(basename, MimeTypes.BINARY);
this.schema = schema;
this.toAvroRecord = toAvroRecord;
this.writer =
- new DataFileWriter<GenericRecord>(new GenericDatumWriter<>())
- .create(schema, getOutputStream());
+ new DataFileWriter<>(writerFactory.apply(schema)).create(schema, getOutputStream());
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index b0399ca..24667e5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -51,7 +51,9 @@ import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
@@ -294,14 +296,16 @@ import org.slf4j.LoggerFactory;
* <ul>
* <li>{@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} (recommended) to
* write data using avro records.
+ * <li>{@link BigQueryIO.Write#withAvroWriter} to write avro data using a user-specified {@link
+ * DatumWriter} (and format function).
* <li>{@link BigQueryIO.Write#withFormatFunction(SerializableFunction)} to write data as json
* encoded {@link TableRow TableRows}.
* </ul>
*
- * If {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} is used, the table
- * schema MUST be specified using one of the {@link Write#withJsonSchema(String)}, {@link
- * Write#withJsonSchema(ValueProvider)}, {@link Write#withSchemaFromView(PCollectionView)} methods,
- * or {@link Write#to(DynamicDestinations)}.
+ * If {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} or {@link
+ * BigQueryIO.Write#withAvroWriter} is used, the table schema MUST be specified using one of the
+ * {@link Write#withJsonSchema(String)}, {@link Write#withJsonSchema(ValueProvider)}, {@link
+ * Write#withSchemaFromView(PCollectionView)} methods, or {@link Write#to(DynamicDestinations)}.
*
* <pre>{@code
* class Quote {
@@ -488,6 +492,9 @@ public class BigQueryIO {
*/
static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = input -> input;
+ static final SerializableFunction<org.apache.avro.Schema, DatumWriter<GenericRecord>>
+ GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>();
+
private static final SerializableFunction<TableSchema, org.apache.avro.Schema>
DEFAULT_AVRO_SCHEMA_FACTORY =
new SerializableFunction<TableSchema, org.apache.avro.Schema>() {
@@ -1763,7 +1770,7 @@ public class BigQueryIO {
abstract SerializableFunction<T, TableRow> getFormatFunction();
@Nullable
- abstract SerializableFunction<AvroWriteRequest<T>, GenericRecord> getAvroFormatFunction();
+ abstract RowWriterFactory.AvroRowWriterFactory<T, ?, ?> getAvroRowWriterFactory();
@Nullable
abstract SerializableFunction<TableSchema, org.apache.avro.Schema> getAvroSchemaFactory();
@@ -1851,8 +1858,8 @@ public class BigQueryIO {
abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);
- abstract Builder<T> setAvroFormatFunction(
- SerializableFunction<AvroWriteRequest<T>, GenericRecord> avroFormatFunction);
+ abstract Builder<T> setAvroRowWriterFactory(
+ RowWriterFactory.AvroRowWriterFactory<T, ?, ?> avroRowWriterFactory);
abstract Builder<T> setAvroSchemaFactory(
SerializableFunction<TableSchema, org.apache.avro.Schema> avroSchemaFactory);
@@ -2056,13 +2063,43 @@ public class BigQueryIO {
}
/**
- * Formats the user's type into a {@link GenericRecord} to be written to BigQuery.
+ * Formats the user's type into a {@link GenericRecord} to be written to BigQuery. The
+ * GenericRecords are written as avro using the standard {@link GenericDatumWriter}.
*
* <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
*/
public Write<T> withAvroFormatFunction(
SerializableFunction<AvroWriteRequest<T>, GenericRecord> avroFormatFunction) {
- return toBuilder().setAvroFormatFunction(avroFormatFunction).setOptimizeWrites(true).build();
+ return withAvroWriter(avroFormatFunction, GENERIC_DATUM_WRITER_FACTORY);
+ }
+
+ /**
+ * Writes the user's type as avro using the supplied {@link DatumWriter}.
+ *
+ * <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
+ *
+ * <p>Overwrites {@link #withAvroFormatFunction} if it has been set.
+ */
+ public Write<T> withAvroWriter(
+ SerializableFunction<org.apache.avro.Schema, DatumWriter<T>> writerFactory) {
+ return withAvroWriter(AvroWriteRequest::getElement, writerFactory);
+ }
+
+ /**
+ * Convert's the user's type to an avro record using the supplied avroFormatFunction. Records
+ * are then written using the supplied writer instances returned from writerFactory.
+ *
+ * <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
+ *
+ * <p>Overwrites {@link #withAvroFormatFunction} if it has been set.
+ */
+ public <AvroT> Write<T> withAvroWriter(
+ SerializableFunction<AvroWriteRequest<T>, AvroT> avroFormatFunction,
+ SerializableFunction<org.apache.avro.Schema, DatumWriter<AvroT>> writerFactory) {
+ return toBuilder()
+ .setOptimizeWrites(true)
+ .setAvroRowWriterFactory(RowWriterFactory.avroRecords(avroFormatFunction, writerFactory))
+ .build();
}
/**
@@ -2484,7 +2521,7 @@ public class BigQueryIO {
if (method != Method.FILE_LOADS) {
// we only support writing avro for FILE_LOADS
checkArgument(
- getAvroFormatFunction() == null,
+ getAvroRowWriterFactory() == null,
"Writing avro formatted data is only supported for FILE_LOADS, however "
+ "the method was %s",
method);
@@ -2546,8 +2583,8 @@ public class BigQueryIO {
PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
boolean optimizeWrites = getOptimizeWrites();
SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
- SerializableFunction<AvroWriteRequest<T>, GenericRecord> avroFormatFunction =
- getAvroFormatFunction();
+ RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT> avroRowWriterFactory =
+ (RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT>) getAvroRowWriterFactory();
boolean hasSchema =
getJsonSchema() != null
@@ -2559,8 +2596,8 @@ public class BigQueryIO {
optimizeWrites = true;
checkArgument(
- avroFormatFunction == null,
- "avroFormatFunction is unsupported when using Beam schemas.");
+ avroRowWriterFactory == null,
+ "avro avroFormatFunction is unsupported when using Beam schemas.");
if (formatFunction == null) {
// If no format function set, then we will automatically convert the input type to a
@@ -2593,10 +2630,10 @@ public class BigQueryIO {
Method method = resolveMethod(input);
if (optimizeWrites) {
RowWriterFactory<T, DestinationT> rowWriterFactory;
- if (avroFormatFunction != null) {
+ if (avroRowWriterFactory != null) {
checkArgument(
formatFunction == null,
- "Only one of withFormatFunction or withAvroFormatFunction maybe set, not both.");
+ "Only one of withFormatFunction or withAvroFormatFunction/withAvroWriter maybe set, not both.");
SerializableFunction<TableSchema, org.apache.avro.Schema> avroSchemaFactory =
getAvroSchemaFactory();
@@ -2607,9 +2644,7 @@ public class BigQueryIO {
+ "is set but no avroSchemaFactory is defined.");
avroSchemaFactory = DEFAULT_AVRO_SCHEMA_FACTORY;
}
- rowWriterFactory =
- RowWriterFactory.avroRecords(
- avroFormatFunction, avroSchemaFactory, dynamicDestinations);
+ rowWriterFactory = avroRowWriterFactory.prepare(dynamicDestinations, avroSchemaFactory);
} else if (formatFunction != null) {
rowWriterFactory = RowWriterFactory.tableRows(formatFunction);
} else {
@@ -2634,7 +2669,7 @@ public class BigQueryIO {
rowWriterFactory,
method);
} else {
- checkArgument(avroFormatFunction == null);
+ checkArgument(avroRowWriterFactory == null);
checkArgument(
formatFunction != null,
"A function must be provided to convert the input type into a TableRow or "
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
index d8e4ea6b..7229957 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
@@ -21,7 +21,7 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.Serializable;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
import org.apache.beam.sdk.transforms.SerializableFunction;
abstract class RowWriterFactory<ElementT, DestinationT> implements Serializable {
@@ -74,29 +74,38 @@ abstract class RowWriterFactory<ElementT, DestinationT> implements Serializable
}
}
- static <ElementT, DestinationT> RowWriterFactory<ElementT, DestinationT> avroRecords(
- SerializableFunction<AvroWriteRequest<ElementT>, GenericRecord> toAvro,
- SerializableFunction<TableSchema, Schema> schemaFactory,
- DynamicDestinations<?, DestinationT> dynamicDestinations) {
- return new AvroRowWriterFactory<>(toAvro, schemaFactory, dynamicDestinations);
+ static <ElementT, AvroT, DestinationT>
+ AvroRowWriterFactory<ElementT, AvroT, DestinationT> avroRecords(
+ SerializableFunction<AvroWriteRequest<ElementT>, AvroT> toAvro,
+ SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory) {
+ return new AvroRowWriterFactory<>(toAvro, writerFactory, null, null);
}
- private static final class AvroRowWriterFactory<ElementT, DestinationT>
+ static final class AvroRowWriterFactory<ElementT, AvroT, DestinationT>
extends RowWriterFactory<ElementT, DestinationT> {
- private final SerializableFunction<AvroWriteRequest<ElementT>, GenericRecord> toAvro;
+ private final SerializableFunction<AvroWriteRequest<ElementT>, AvroT> toAvro;
+ private final SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory;
private final SerializableFunction<TableSchema, Schema> schemaFactory;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private AvroRowWriterFactory(
- SerializableFunction<AvroWriteRequest<ElementT>, GenericRecord> toAvro,
+ SerializableFunction<AvroWriteRequest<ElementT>, AvroT> toAvro,
+ SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory,
SerializableFunction<TableSchema, Schema> schemaFactory,
DynamicDestinations<?, DestinationT> dynamicDestinations) {
this.toAvro = toAvro;
+ this.writerFactory = writerFactory;
this.schemaFactory = schemaFactory;
this.dynamicDestinations = dynamicDestinations;
}
+ AvroRowWriterFactory<ElementT, AvroT, DestinationT> prepare(
+ DynamicDestinations<?, DestinationT> dynamicDestinations,
+ SerializableFunction<TableSchema, Schema> schemaFactory) {
+ return new AvroRowWriterFactory<>(toAvro, writerFactory, schemaFactory, dynamicDestinations);
+ }
+
@Override
OutputType getOutputType() {
return OutputType.AvroGenericRecord;
@@ -107,7 +116,7 @@ abstract class RowWriterFactory<ElementT, DestinationT> implements Serializable
throws Exception {
TableSchema tableSchema = dynamicDestinations.getSchema(destination);
Schema avroSchema = schemaFactory.apply(tableSchema);
- return new AvroRowWriter<>(tempFilePrefix, avroSchema, toAvro);
+ return new AvroRowWriter<>(tempFilePrefix, avroSchema, toAvro, writerFactory);
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 0e2064c..f3b9662 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -65,7 +65,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
@@ -783,6 +786,66 @@ public class BigQueryIOWriteTest implements Serializable {
}
@Test
+ public void testWriteAvroWithCustomWriter() throws Exception {
+ SerializableFunction<AvroWriteRequest<InputRecord>, GenericRecord> formatFunction =
+ r -> {
+ GenericRecord rec = new GenericData.Record(r.getSchema());
+ InputRecord i = r.getElement();
+ rec.put("strVal", i.strVal());
+ rec.put("longVal", i.longVal());
+ rec.put("doubleVal", i.doubleVal());
+ rec.put("instantVal", i.instantVal().getMillis() * 1000);
+ return rec;
+ };
+
+ SerializableFunction<org.apache.avro.Schema, DatumWriter<GenericRecord>> customWriterFactory =
+ s ->
+ new GenericDatumWriter<GenericRecord>() {
+ @Override
+ protected void writeString(org.apache.avro.Schema schema, Object datum, Encoder out)
+ throws IOException {
+ super.writeString(schema, datum.toString() + "_custom", out);
+ }
+ };
+
+ p.apply(
+ Create.of(
+ InputRecord.create("test", 1, 1.0, Instant.parse("2019-01-01T00:00:00Z")),
+ InputRecord.create("test2", 2, 2.0, Instant.parse("2019-02-01T00:00:00Z")))
+ .withCoder(INPUT_RECORD_CODER))
+ .apply(
+ BigQueryIO.<InputRecord>write()
+ .to("dataset-id.table-id")
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("strVal").setType("STRING"),
+ new TableFieldSchema().setName("longVal").setType("INTEGER"),
+ new TableFieldSchema().setName("doubleVal").setType("FLOAT"),
+ new TableFieldSchema().setName("instantVal").setType("TIMESTAMP"))))
+ .withTestServices(fakeBqServices)
+ .withAvroWriter(formatFunction, customWriterFactory)
+ .withoutValidation());
+ p.run();
+
+ assertThat(
+ fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ containsInAnyOrder(
+ new TableRow()
+ .set("strVal", "test_custom")
+ .set("longVal", "1")
+ .set("doubleVal", 1.0D)
+ .set("instantVal", "2019-01-01 00:00:00 UTC"),
+ new TableRow()
+ .set("strVal", "test2_custom")
+ .set("longVal", "2")
+ .set("doubleVal", 2.0D)
+ .set("instantVal", "2019-02-01 00:00:00 UTC")));
+ }
+
+ @Test
public void testStreamingWrite() throws Exception {
p.apply(
Create.of(
@@ -1352,7 +1415,7 @@ public class BigQueryIOWriteTest implements Serializable {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
- "Only one of withFormatFunction or withAvroFormatFunction maybe set, not both");
+ "Only one of withFormatFunction or withAvroFormatFunction/withAvroWriter maybe set, not both.");
p.apply(Create.empty(INPUT_RECORD_CODER))
.apply(
BigQueryIO.<InputRecord>write()