You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/04/30 20:18:58 UTC

[beam] branch master updated: [BEAM-9795] Support custom avro DatumWriters when writing to BigQuery

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ce75c4f  [BEAM-9795] Support custom avro DatumWriters when writing to BigQuery
     new 144b96d  Merge pull request #11479 from [BEAM-9795] Support custom avro DatumWriters when writing to BigQuery
ce75c4f is described below

commit ce75c4f5bee2424dd8c24da90e9913d3daeb7f16
Author: steve <sn...@twitter.com>
AuthorDate: Tue Apr 21 11:10:59 2020 -0400

    [BEAM-9795] Support custom avro DatumWriters when writing to BigQuery
---
 .../beam/sdk/io/gcp/bigquery/AvroRowWriter.java    | 15 ++---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 75 ++++++++++++++++------
 .../beam/sdk/io/gcp/bigquery/RowWriterFactory.java | 29 ++++++---
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 65 ++++++++++++++++++-
 4 files changed, 145 insertions(+), 39 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java
index a0509a6..74a0bb4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java
@@ -20,28 +20,27 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import java.io.IOException;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.MimeTypes;
 
-class AvroRowWriter<T> extends BigQueryRowWriter<T> {
-  private final DataFileWriter<GenericRecord> writer;
+class AvroRowWriter<AvroT, T> extends BigQueryRowWriter<T> {
+  private final DataFileWriter<AvroT> writer;
   private final Schema schema;
-  private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toAvroRecord;
+  private final SerializableFunction<AvroWriteRequest<T>, AvroT> toAvroRecord;
 
   AvroRowWriter(
       String basename,
       Schema schema,
-      SerializableFunction<AvroWriteRequest<T>, GenericRecord> toAvroRecord)
+      SerializableFunction<AvroWriteRequest<T>, AvroT> toAvroRecord,
+      SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory)
       throws Exception {
     super(basename, MimeTypes.BINARY);
 
     this.schema = schema;
     this.toAvroRecord = toAvroRecord;
     this.writer =
-        new DataFileWriter<GenericRecord>(new GenericDatumWriter<>())
-            .create(schema, getOutputStream());
+        new DataFileWriter<>(writerFactory.apply(schema)).create(schema, getOutputStream());
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index b0399ca..24667e5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -51,7 +51,9 @@ import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -294,14 +296,16 @@ import org.slf4j.LoggerFactory;
  * <ul>
  *   <li>{@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} (recommended) to
  *       write data using avro records.
+ *   <li>{@link BigQueryIO.Write#withAvroWriter} to write avro data using a user-specified {@link
+ *       DatumWriter} (and format function).
  *   <li>{@link BigQueryIO.Write#withFormatFunction(SerializableFunction)} to write data as json
  *       encoded {@link TableRow TableRows}.
  * </ul>
  *
- * If {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} is used, the table
- * schema MUST be specified using one of the {@link Write#withJsonSchema(String)}, {@link
- * Write#withJsonSchema(ValueProvider)}, {@link Write#withSchemaFromView(PCollectionView)} methods,
- * or {@link Write#to(DynamicDestinations)}.
+ * If {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} or {@link
+ * BigQueryIO.Write#withAvroWriter} is used, the table schema MUST be specified using one of the
+ * {@link Write#withJsonSchema(String)}, {@link Write#withJsonSchema(ValueProvider)}, {@link
+ * Write#withSchemaFromView(PCollectionView)} methods, or {@link Write#to(DynamicDestinations)}.
  *
  * <pre>{@code
  * class Quote {
@@ -488,6 +492,9 @@ public class BigQueryIO {
    */
   static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = input -> input;
 
+  static final SerializableFunction<org.apache.avro.Schema, DatumWriter<GenericRecord>>
+      GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>();
+
   private static final SerializableFunction<TableSchema, org.apache.avro.Schema>
       DEFAULT_AVRO_SCHEMA_FACTORY =
           new SerializableFunction<TableSchema, org.apache.avro.Schema>() {
@@ -1763,7 +1770,7 @@ public class BigQueryIO {
     abstract SerializableFunction<T, TableRow> getFormatFunction();
 
     @Nullable
-    abstract SerializableFunction<AvroWriteRequest<T>, GenericRecord> getAvroFormatFunction();
+    abstract RowWriterFactory.AvroRowWriterFactory<T, ?, ?> getAvroRowWriterFactory();
 
     @Nullable
     abstract SerializableFunction<TableSchema, org.apache.avro.Schema> getAvroSchemaFactory();
@@ -1851,8 +1858,8 @@ public class BigQueryIO {
 
       abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);
 
-      abstract Builder<T> setAvroFormatFunction(
-          SerializableFunction<AvroWriteRequest<T>, GenericRecord> avroFormatFunction);
+      abstract Builder<T> setAvroRowWriterFactory(
+          RowWriterFactory.AvroRowWriterFactory<T, ?, ?> avroRowWriterFactory);
 
       abstract Builder<T> setAvroSchemaFactory(
           SerializableFunction<TableSchema, org.apache.avro.Schema> avroSchemaFactory);
@@ -2056,13 +2063,43 @@ public class BigQueryIO {
     }
 
     /**
-     * Formats the user's type into a {@link GenericRecord} to be written to BigQuery.
+     * Formats the user's type into a {@link GenericRecord} to be written to BigQuery. The
+     * GenericRecords are written as avro using the standard {@link GenericDatumWriter}.
      *
      * <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
      */
     public Write<T> withAvroFormatFunction(
         SerializableFunction<AvroWriteRequest<T>, GenericRecord> avroFormatFunction) {
-      return toBuilder().setAvroFormatFunction(avroFormatFunction).setOptimizeWrites(true).build();
+      return withAvroWriter(avroFormatFunction, GENERIC_DATUM_WRITER_FACTORY);
+    }
+
+    /**
+     * Writes the user's type as avro using the supplied {@link DatumWriter}.
+     *
+     * <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
+     *
+     * <p>Overwrites {@link #withAvroFormatFunction} if it has been set.
+     */
+    public Write<T> withAvroWriter(
+        SerializableFunction<org.apache.avro.Schema, DatumWriter<T>> writerFactory) {
+      return withAvroWriter(AvroWriteRequest::getElement, writerFactory);
+    }
+
+    /**
+     * Convert's the user's type to an avro record using the supplied avroFormatFunction. Records
+     * are then written using the supplied writer instances returned from writerFactory.
+     *
+     * <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
+     *
+     * <p>Overwrites {@link #withAvroFormatFunction} if it has been set.
+     */
+    public <AvroT> Write<T> withAvroWriter(
+        SerializableFunction<AvroWriteRequest<T>, AvroT> avroFormatFunction,
+        SerializableFunction<org.apache.avro.Schema, DatumWriter<AvroT>> writerFactory) {
+      return toBuilder()
+          .setOptimizeWrites(true)
+          .setAvroRowWriterFactory(RowWriterFactory.avroRecords(avroFormatFunction, writerFactory))
+          .build();
     }
 
     /**
@@ -2484,7 +2521,7 @@ public class BigQueryIO {
       if (method != Method.FILE_LOADS) {
         // we only support writing avro for FILE_LOADS
         checkArgument(
-            getAvroFormatFunction() == null,
+            getAvroRowWriterFactory() == null,
             "Writing avro formatted data is only supported for FILE_LOADS, however "
                 + "the method was %s",
             method);
@@ -2546,8 +2583,8 @@ public class BigQueryIO {
         PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
       boolean optimizeWrites = getOptimizeWrites();
       SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
-      SerializableFunction<AvroWriteRequest<T>, GenericRecord> avroFormatFunction =
-          getAvroFormatFunction();
+      RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT> avroRowWriterFactory =
+          (RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT>) getAvroRowWriterFactory();
 
       boolean hasSchema =
           getJsonSchema() != null
@@ -2559,8 +2596,8 @@ public class BigQueryIO {
         optimizeWrites = true;
 
         checkArgument(
-            avroFormatFunction == null,
-            "avroFormatFunction is unsupported when using Beam schemas.");
+            avroRowWriterFactory == null,
+            "avro avroFormatFunction is unsupported when using Beam schemas.");
 
         if (formatFunction == null) {
           // If no format function set, then we will automatically convert the input type to a
@@ -2593,10 +2630,10 @@ public class BigQueryIO {
       Method method = resolveMethod(input);
       if (optimizeWrites) {
         RowWriterFactory<T, DestinationT> rowWriterFactory;
-        if (avroFormatFunction != null) {
+        if (avroRowWriterFactory != null) {
           checkArgument(
               formatFunction == null,
-              "Only one of withFormatFunction or withAvroFormatFunction maybe set, not both.");
+              "Only one of withFormatFunction or withAvroFormatFunction/withAvroWriter maybe set, not both.");
 
           SerializableFunction<TableSchema, org.apache.avro.Schema> avroSchemaFactory =
               getAvroSchemaFactory();
@@ -2607,9 +2644,7 @@ public class BigQueryIO {
                     + "is set but no avroSchemaFactory is defined.");
             avroSchemaFactory = DEFAULT_AVRO_SCHEMA_FACTORY;
           }
-          rowWriterFactory =
-              RowWriterFactory.avroRecords(
-                  avroFormatFunction, avroSchemaFactory, dynamicDestinations);
+          rowWriterFactory = avroRowWriterFactory.prepare(dynamicDestinations, avroSchemaFactory);
         } else if (formatFunction != null) {
           rowWriterFactory = RowWriterFactory.tableRows(formatFunction);
         } else {
@@ -2634,7 +2669,7 @@ public class BigQueryIO {
             rowWriterFactory,
             method);
       } else {
-        checkArgument(avroFormatFunction == null);
+        checkArgument(avroRowWriterFactory == null);
         checkArgument(
             formatFunction != null,
             "A function must be provided to convert the input type into a TableRow or "
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
index d8e4ea6b..7229957 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
@@ -21,7 +21,7 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.io.Serializable;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 
 abstract class RowWriterFactory<ElementT, DestinationT> implements Serializable {
@@ -74,29 +74,38 @@ abstract class RowWriterFactory<ElementT, DestinationT> implements Serializable
     }
   }
 
-  static <ElementT, DestinationT> RowWriterFactory<ElementT, DestinationT> avroRecords(
-      SerializableFunction<AvroWriteRequest<ElementT>, GenericRecord> toAvro,
-      SerializableFunction<TableSchema, Schema> schemaFactory,
-      DynamicDestinations<?, DestinationT> dynamicDestinations) {
-    return new AvroRowWriterFactory<>(toAvro, schemaFactory, dynamicDestinations);
+  static <ElementT, AvroT, DestinationT>
+      AvroRowWriterFactory<ElementT, AvroT, DestinationT> avroRecords(
+          SerializableFunction<AvroWriteRequest<ElementT>, AvroT> toAvro,
+          SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory) {
+    return new AvroRowWriterFactory<>(toAvro, writerFactory, null, null);
   }
 
-  private static final class AvroRowWriterFactory<ElementT, DestinationT>
+  static final class AvroRowWriterFactory<ElementT, AvroT, DestinationT>
       extends RowWriterFactory<ElementT, DestinationT> {
 
-    private final SerializableFunction<AvroWriteRequest<ElementT>, GenericRecord> toAvro;
+    private final SerializableFunction<AvroWriteRequest<ElementT>, AvroT> toAvro;
+    private final SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory;
     private final SerializableFunction<TableSchema, Schema> schemaFactory;
     private final DynamicDestinations<?, DestinationT> dynamicDestinations;
 
     private AvroRowWriterFactory(
-        SerializableFunction<AvroWriteRequest<ElementT>, GenericRecord> toAvro,
+        SerializableFunction<AvroWriteRequest<ElementT>, AvroT> toAvro,
+        SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory,
         SerializableFunction<TableSchema, Schema> schemaFactory,
         DynamicDestinations<?, DestinationT> dynamicDestinations) {
       this.toAvro = toAvro;
+      this.writerFactory = writerFactory;
       this.schemaFactory = schemaFactory;
       this.dynamicDestinations = dynamicDestinations;
     }
 
+    AvroRowWriterFactory<ElementT, AvroT, DestinationT> prepare(
+        DynamicDestinations<?, DestinationT> dynamicDestinations,
+        SerializableFunction<TableSchema, Schema> schemaFactory) {
+      return new AvroRowWriterFactory<>(toAvro, writerFactory, schemaFactory, dynamicDestinations);
+    }
+
     @Override
     OutputType getOutputType() {
       return OutputType.AvroGenericRecord;
@@ -107,7 +116,7 @@ abstract class RowWriterFactory<ElementT, DestinationT> implements Serializable
         throws Exception {
       TableSchema tableSchema = dynamicDestinations.getSchema(destination);
       Schema avroSchema = schemaFactory.apply(tableSchema);
-      return new AvroRowWriter<>(tempFilePrefix, avroSchema, toAvro);
+      return new AvroRowWriter<>(tempFilePrefix, avroSchema, toAvro, writerFactory);
     }
 
     @Override
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 0e2064c..f3b9662 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -65,7 +65,10 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -783,6 +786,66 @@ public class BigQueryIOWriteTest implements Serializable {
   }
 
   @Test
+  public void testWriteAvroWithCustomWriter() throws Exception {
+    SerializableFunction<AvroWriteRequest<InputRecord>, GenericRecord> formatFunction =
+        r -> {
+          GenericRecord rec = new GenericData.Record(r.getSchema());
+          InputRecord i = r.getElement();
+          rec.put("strVal", i.strVal());
+          rec.put("longVal", i.longVal());
+          rec.put("doubleVal", i.doubleVal());
+          rec.put("instantVal", i.instantVal().getMillis() * 1000);
+          return rec;
+        };
+
+    SerializableFunction<org.apache.avro.Schema, DatumWriter<GenericRecord>> customWriterFactory =
+        s ->
+            new GenericDatumWriter<GenericRecord>() {
+              @Override
+              protected void writeString(org.apache.avro.Schema schema, Object datum, Encoder out)
+                  throws IOException {
+                super.writeString(schema, datum.toString() + "_custom", out);
+              }
+            };
+
+    p.apply(
+            Create.of(
+                    InputRecord.create("test", 1, 1.0, Instant.parse("2019-01-01T00:00:00Z")),
+                    InputRecord.create("test2", 2, 2.0, Instant.parse("2019-02-01T00:00:00Z")))
+                .withCoder(INPUT_RECORD_CODER))
+        .apply(
+            BigQueryIO.<InputRecord>write()
+                .to("dataset-id.table-id")
+                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                .withSchema(
+                    new TableSchema()
+                        .setFields(
+                            ImmutableList.of(
+                                new TableFieldSchema().setName("strVal").setType("STRING"),
+                                new TableFieldSchema().setName("longVal").setType("INTEGER"),
+                                new TableFieldSchema().setName("doubleVal").setType("FLOAT"),
+                                new TableFieldSchema().setName("instantVal").setType("TIMESTAMP"))))
+                .withTestServices(fakeBqServices)
+                .withAvroWriter(formatFunction, customWriterFactory)
+                .withoutValidation());
+    p.run();
+
+    assertThat(
+        fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+        containsInAnyOrder(
+            new TableRow()
+                .set("strVal", "test_custom")
+                .set("longVal", "1")
+                .set("doubleVal", 1.0D)
+                .set("instantVal", "2019-01-01 00:00:00 UTC"),
+            new TableRow()
+                .set("strVal", "test2_custom")
+                .set("longVal", "2")
+                .set("doubleVal", 2.0D)
+                .set("instantVal", "2019-02-01 00:00:00 UTC")));
+  }
+
+  @Test
   public void testStreamingWrite() throws Exception {
     p.apply(
             Create.of(
@@ -1352,7 +1415,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(
-        "Only one of withFormatFunction or withAvroFormatFunction maybe set, not both");
+        "Only one of withFormatFunction or withAvroFormatFunction/withAvroWriter maybe set, not both.");
     p.apply(Create.empty(INPUT_RECORD_CODER))
         .apply(
             BigQueryIO.<InputRecord>write()