You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sn...@apache.org on 2022/10/06 16:42:19 UTC

[beam] branch master updated: Support custom avro DatumReader when reading from BigQuery (#22718)

This is an automated email from the ASF dual-hosted git repository.

sniemitz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bdf3579150 Support custom avro DatumReader when reading from BigQuery (#22718)
8bdf3579150 is described below

commit 8bdf35791507505f698c97502a075f0e5822a2b0
Author: Kanishk Karanawat <kk...@gmail.com>
AuthorDate: Thu Oct 6 12:42:07 2022 -0400

    Support custom avro DatumReader when reading from BigQuery (#22718)
    
    Co-authored-by: Kanishk Karanawat <kk...@twitter.com>
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 125 ++++++++++++++++++-
 .../sdk/io/gcp/bigquery/BigQueryQuerySource.java   |  10 +-
 .../io/gcp/bigquery/BigQueryQuerySourceDef.java    |   5 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java    |  44 +++----
 .../sdk/io/gcp/bigquery/BigQuerySourceDef.java     |   6 +-
 .../sdk/io/gcp/bigquery/BigQueryTableSource.java   |  10 +-
 .../io/gcp/bigquery/BigQueryTableSourceDef.java    |   5 +-
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java    | 132 ++++++++++++++++++++-
 8 files changed, 282 insertions(+), 55 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 024dcb053b5..760609988d5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -45,6 +45,7 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -53,9 +54,12 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -69,6 +73,7 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MoveOptions;
@@ -122,10 +127,13 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -573,6 +581,57 @@ public class BigQueryIO {
             BigQueryUtils.tableRowFromBeamRow());
   }
 
+  private static class TableSchemaFunction
+      implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
+    @Override
+    public @Nullable TableSchema apply(@Nullable String input) {
+      return BigQueryHelpers.fromJsonString(input, TableSchema.class);
+    }
+  }
+
+  @VisibleForTesting
+  static class GenericDatumTransformer<T> implements DatumReader<T> {
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final Supplier<TableSchema> tableSchema;
+    private GenericDatumReader<T> reader;
+    private org.apache.avro.Schema writerSchema;
+    private org.apache.avro.Schema readerSchema;
+
+    public GenericDatumTransformer(
+        SerializableFunction<SchemaAndRecord, T> parseFn,
+        String tableSchema,
+        org.apache.avro.Schema writer,
+        org.apache.avro.Schema reader) {
+      this.parseFn = parseFn;
+      this.tableSchema =
+          Suppliers.memoize(
+              Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema)));
+      this.writerSchema = writer;
+      this.readerSchema = reader;
+      this.reader = new GenericDatumReader<>(this.writerSchema, this.readerSchema);
+    }
+
+    @Override
+    public void setSchema(org.apache.avro.Schema schema) {
+      if (this.writerSchema.equals(schema)) {
+        return;
+      }
+
+      this.writerSchema = schema;
+      if (this.readerSchema == null) {
+        this.readerSchema = schema;
+      }
+
+      this.reader = new GenericDatumReader<>(this.writerSchema, this.readerSchema);
+    }
+
+    @Override
+    public T read(T reuse, Decoder in) throws IOException {
+      GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
+      return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get()));
+    }
+  }
+
   /**
    * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
    * each row of the table or query result, parsed from the BigQuery AVRO format using the specified
@@ -598,6 +657,21 @@ public class BigQueryIO {
         .setValidate(true)
         .setWithTemplateCompatibility(false)
         .setBigQueryServices(new BigQueryServicesImpl())
+        .setDatumReaderFactory(
+            (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
+                input -> {
+                  try {
+                    String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input);
+                    return (AvroSource.DatumReaderFactory<T>)
+                        (writer, reader) ->
+                            new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer, reader);
+                  } catch (IOException e) {
+                    LOG.warn(
+                        String.format("Error while converting table schema %s to JSON!", input), e);
+                    return null;
+                  }
+                })
+        // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.
         .setParseFn(parseFn)
         .setMethod(TypedRead.Method.DEFAULT)
         .setUseAvroLogicalTypes(false)
@@ -606,6 +680,34 @@ public class BigQueryIO {
         .build();
   }
 
+  /**
+   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
+   * each row of the table or query result. This API directly deserializes BigQuery AVRO data to the
+   * input class, based on the appropriate {@link org.apache.avro.io.DatumReader}.
+   *
+   * <pre>{@code
+   * class ClickEvent { long userId; String url; ... }
+   *
+   * p.apply(BigQueryIO.read(ClickEvent.class)).from("...")
+   * .read((AvroSource.DatumReaderFactory<ClickEvent>) (writer, reader) -> new ReflectDatumReader<>(ReflectData.get().getSchema(ClickEvent.class)));
+   * }</pre>
+   */
+  public static <T> TypedRead<T> readWithDatumReader(
+      AvroSource.DatumReaderFactory<T> readerFactory) {
+    return new AutoValue_BigQueryIO_TypedRead.Builder<T>()
+        .setValidate(true)
+        .setWithTemplateCompatibility(false)
+        .setBigQueryServices(new BigQueryServicesImpl())
+        .setDatumReaderFactory(
+            (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
+                input -> readerFactory)
+        .setMethod(TypedRead.Method.DEFAULT)
+        .setUseAvroLogicalTypes(false)
+        .setFormat(DataFormat.AVRO)
+        .setProjectionPushdownApplied(false)
+        .build();
+  }
+
   @VisibleForTesting
   static class TableRowParser implements SerializableFunction<SchemaAndRecord, TableRow> {
 
@@ -804,6 +906,9 @@ public class BigQueryIO {
 
       abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> parseFn);
 
+      abstract Builder<T> setDatumReaderFactory(
+          SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> factoryFn);
+
       abstract Builder<T> setCoder(Coder<T> coder);
 
       abstract Builder<T> setKmsKey(String kmsKey);
@@ -836,7 +941,10 @@ public class BigQueryIO {
 
     abstract BigQueryServices getBigQueryServices();
 
-    abstract SerializableFunction<SchemaAndRecord, T> getParseFn();
+    abstract @Nullable SerializableFunction<SchemaAndRecord, T> getParseFn();
+
+    abstract @Nullable SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>
+        getDatumReaderFactory();
 
     abstract @Nullable QueryPriority getQueryPriority();
 
@@ -1065,7 +1173,8 @@ public class BigQueryIO {
             getFlattenResults() != null, "flattenResults should not be null if query is set");
         checkArgument(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
       }
-      checkArgument(getParseFn() != null, "A parseFn is required");
+
+      checkArgument(getDatumReaderFactory() != null, "A readerDatumFactory is required");
 
       // if both toRowFn and fromRowFn values are set, enable Beam schema support
       Pipeline p = input.getPipeline();
@@ -1108,7 +1217,7 @@ public class BigQueryIO {
             p.apply(
                 org.apache.beam.sdk.io.Read.from(
                     sourceDef.toSource(
-                        staticJobUuid, coder, getParseFn(), getUseAvroLogicalTypes())));
+                        staticJobUuid, coder, getDatumReaderFactory(), getUseAvroLogicalTypes())));
       } else {
         // Create a singleton job ID token at execution time.
         jobIdTokenCollection =
@@ -1136,7 +1245,10 @@ public class BigQueryIO {
                             String jobUuid = c.element();
                             BigQuerySourceBase<T> source =
                                 sourceDef.toSource(
-                                    jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
+                                    jobUuid,
+                                    coder,
+                                    getDatumReaderFactory(),
+                                    getUseAvroLogicalTypes());
                             BigQueryOptions options =
                                 c.getPipelineOptions().as(BigQueryOptions.class);
                             ExtractResult res = source.extractFiles(options);
@@ -1169,7 +1281,10 @@ public class BigQueryIO {
                                 String jobUuid = c.sideInput(jobIdTokenView);
                                 BigQuerySourceBase<T> source =
                                     sourceDef.toSource(
-                                        jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
+                                        jobUuid,
+                                        coder,
+                                        getDatumReaderFactory(),
+                                        getUseAvroLogicalTypes());
                                 List<BoundedSource<T>> sources =
                                     source.createSources(
                                         ImmutableList.of(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 4ca2c647d03..8e3b437bec5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -18,8 +18,10 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -34,10 +36,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
       BigQueryQuerySourceDef queryDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
+      SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
       boolean useAvroLogicalTypes) {
     return new BigQueryQuerySource<>(
-        stepUuid, queryDef, bqServices, coder, parseFn, useAvroLogicalTypes);
+        stepUuid, queryDef, bqServices, coder, readerFactory, useAvroLogicalTypes);
   }
 
   private final BigQueryQuerySourceDef queryDef;
@@ -47,9 +49,9 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
       BigQueryQuerySourceDef queryDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
+      SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
       boolean useAvroLogicalTypes) {
-    super(stepUuid, bqServices, coder, parseFn, useAvroLogicalTypes);
+    super(stepUuid, bqServices, coder, readerFactory, useAvroLogicalTypes);
     this.queryDef = queryDef;
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 1d06c819ccf..606d1d3ad88 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.JobType;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.schemas.Schema;
@@ -153,10 +154,10 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
   public <T> BigQuerySourceBase<T> toSource(
       String stepUuid,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
+      SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
       boolean useAvroLogicalTypes) {
     return BigQueryQuerySource.create(
-        stepUuid, this, bqServices, coder, parseFn, useAvroLogicalTypes);
+        stepUuid, this, bqServices, coder, readerFactory, useAvroLogicalTypes);
   }
 
   /** {@inheritDoc} */
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 18beda5c1c6..1f16fb44049 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -28,9 +28,7 @@ import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -41,9 +39,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.JobType;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -74,7 +69,7 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
   protected final BigQueryServices bqServices;
 
   private transient @Nullable List<BoundedSource<T>> cachedSplitResult = null;
-  private SerializableFunction<SchemaAndRecord, T> parseFn;
+  private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory;
   private Coder<T> coder;
   private final boolean useAvroLogicalTypes;
 
@@ -82,12 +77,12 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
       String stepUuid,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
+      SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
       boolean useAvroLogicalTypes) {
     this.stepUuid = checkArgumentNotNull(stepUuid, "stepUuid");
     this.bqServices = checkArgumentNotNull(bqServices, "bqServices");
     this.coder = checkArgumentNotNull(coder, "coder");
-    this.parseFn = checkArgumentNotNull(parseFn, "parseFn");
+    this.readerFactory = checkArgumentNotNull(readerFactory, "readerFactory");
     this.useAvroLogicalTypes = useAvroLogicalTypes;
   }
 
@@ -239,41 +234,30 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
     return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
   }
 
-  private static class TableSchemaFunction
-      implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
-    @Override
-    public @Nullable TableSchema apply(@Nullable String input) {
-      return BigQueryHelpers.fromJsonString(input, TableSchema.class);
-    }
-  }
-
   List<BoundedSource<T>> createSources(
       List<ResourceId> files, TableSchema schema, @Nullable List<MatchResult.Metadata> metadata)
       throws IOException, InterruptedException {
+    String avroSchema =
+        BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString();
 
-    final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
-    SerializableFunction<GenericRecord, T> fnWrapper =
-        new SerializableFunction<GenericRecord, T>() {
-          private Supplier<TableSchema> schema =
-              Suppliers.memoize(
-                  Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonSchema)));
-
-          @Override
-          public T apply(GenericRecord input) {
-            return parseFn.apply(new SchemaAndRecord(input, schema.get()));
-          }
-        };
+    AvroSource.DatumReaderFactory<T> factory = readerFactory.apply(schema);
 
     List<BoundedSource<T>> avroSources = Lists.newArrayList();
     // If metadata is available, create AvroSources with said metadata in SINGLE_FILE_OR_SUBRANGE
     // mode.
     if (metadata != null) {
       for (MatchResult.Metadata file : metadata) {
-        avroSources.add(AvroSource.from(file).withParseFn(fnWrapper, getOutputCoder()));
+        avroSources.add(
+            (AvroSource<T>)
+                AvroSource.from(file).withSchema(avroSchema).withDatumReaderFactory(factory));
       }
     } else {
       for (ResourceId file : files) {
-        avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
+        avroSources.add(
+            (AvroSource<T>)
+                AvroSource.from(file.toString())
+                    .withSchema(avroSchema)
+                    .withDatumReaderFactory(factory));
       }
     }
     return ImmutableList.copyOf(avroSources);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
index 83006181df1..9532a2f4d6f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
@@ -17,10 +17,12 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import com.google.api.services.bigquery.model.TableSchema;
 import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 
@@ -34,7 +36,7 @@ interface BigQuerySourceDef extends Serializable {
    *
    * @param stepUuid Job UUID
    * @param coder Coder
-   * @param parseFn Parse function
+   * @param readerFactory Reader factory
    * @param useAvroLogicalTypes Use avro logical types i.e DATE, TIME
    * @param <T> Type of the resulting PCollection
    * @return An implementation of {@link BigQuerySourceBase}
@@ -42,7 +44,7 @@ interface BigQuerySourceDef extends Serializable {
   <T> BigQuerySourceBase<T> toSource(
       String stepUuid,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
+      SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
       boolean useAvroLogicalTypes);
 
   /**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index f944ae673b6..2d274ed0e51 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -19,9 +19,11 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -38,10 +40,10 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
       BigQueryTableSourceDef tableDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
+      SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
       boolean useAvroLogicalTypes) {
     return new BigQueryTableSource<>(
-        stepUuid, tableDef, bqServices, coder, parseFn, useAvroLogicalTypes);
+        stepUuid, tableDef, bqServices, coder, readerFactory, useAvroLogicalTypes);
   }
 
   private final BigQueryTableSourceDef tableDef;
@@ -52,9 +54,9 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
       BigQueryTableSourceDef tableDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
+      SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
       boolean useAvroLogicalTypes) {
-    super(stepUuid, bqServices, coder, parseFn, useAvroLogicalTypes);
+    super(stepUuid, bqServices, coder, readerFactory, useAvroLogicalTypes);
     this.tableDef = tableDef;
     this.tableSizeBytes = new AtomicReference<>();
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index 72de3f56e6c..e78ea0b5d7c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.schemas.Schema;
@@ -95,10 +96,10 @@ class BigQueryTableSourceDef implements BigQuerySourceDef {
   public <T> BigQuerySourceBase<T> toSource(
       String stepUuid,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn,
+      SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
       boolean useAvroLogicalTypes) {
     return BigQueryTableSource.create(
-        stepUuid, this, bqServices, coder, parseFn, useAvroLogicalTypes);
+        stepUuid, this, bqServices, coder, readerFactory, useAvroLogicalTypes);
   }
 
   /** {@inheritDoc} */
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index f2114c8eb07..75202b4076b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -40,11 +40,15 @@ import java.math.BigInteger;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.JobType;
@@ -130,6 +134,21 @@ public class BigQueryIOReadTest implements Serializable {
           .withDatasetService(fakeDatasetService)
           .withJobService(fakeJobService);
 
+  private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>
+      datumReaderFactoryFn =
+          (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>)
+              input -> {
+                try {
+                  String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input);
+                  return (AvroSource.DatumReaderFactory<TableRow>)
+                      (writer, reader) ->
+                          new BigQueryIO.GenericDatumTransformer<>(
+                              BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer, reader);
+                } catch (IOException e) {
+                  return null;
+                }
+              };
+
   private void checkSetsProject(String projectId) throws Exception {
     fakeDatasetService.createDataset(projectId, "dataset-id", "", "", null);
     String tableId = "sometable";
@@ -490,6 +509,107 @@ public class BigQueryIOReadTest implements Serializable {
     p.run();
   }
 
+  static class User extends SpecificRecordBase {
+    private static final org.apache.avro.Schema schema =
+        org.apache.avro.SchemaBuilder.record("User")
+            .namespace("org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOReadTest$")
+            .fields()
+            .optionalString("name")
+            .endRecord();
+
+    private String name;
+
+    public String getName() {
+      return this.name;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    public User() {}
+
+    @Override
+    public void put(int i, Object v) {
+      if (i == 0) {
+        setName(((org.apache.avro.util.Utf8) v).toString());
+      }
+    }
+
+    @Override
+    public Object get(int i) {
+      if (i == 0) {
+        return getName();
+      }
+      return null;
+    }
+
+    @Override
+    public org.apache.avro.Schema getSchema() {
+      return schema;
+    }
+
+    public static org.apache.avro.Schema getAvroSchema() {
+      return schema;
+    }
+  }
+
+  @Test
+  public void testReadTableWithReaderDatumFactory() throws IOException, InterruptedException {
+    // setup
+    Table someTable = new Table();
+    someTable.setSchema(
+        new TableSchema()
+            .setFields(ImmutableList.of(new TableFieldSchema().setName("name").setType("STRING"))));
+    someTable.setTableReference(
+        new TableReference()
+            .setProjectId("non-executing-project")
+            .setDatasetId("schema_dataset")
+            .setTableId("schema_table"));
+    someTable.setNumBytes(1024L * 1024L);
+    FakeDatasetService fakeDatasetService = new FakeDatasetService();
+    fakeDatasetService.createDataset("non-executing-project", "schema_dataset", "", "", null);
+    fakeDatasetService.createTable(someTable);
+
+    List<TableRow> records =
+        Lists.newArrayList(
+            new TableRow().set("name", "a"),
+            new TableRow().set("name", "b"),
+            new TableRow().set("name", "c"),
+            new TableRow().set("name", "d"));
+
+    fakeDatasetService.insertAll(someTable.getTableReference(), records, null);
+
+    FakeBigQueryServices fakeBqServices =
+        new FakeBigQueryServices()
+            .withJobService(new FakeJobService())
+            .withDatasetService(fakeDatasetService);
+
+    BigQueryIO.TypedRead<User> read =
+        BigQueryIO.readWithDatumReader(
+                (AvroSource.DatumReaderFactory<User>)
+                    (writer, reader) -> new SpecificDatumReader<>(User.getAvroSchema()))
+            .from("non-executing-project:schema_dataset.schema_table")
+            .withTestServices(fakeBqServices)
+            .withoutValidation()
+            .withCoder(SerializableCoder.of(User.class));
+
+    PCollection<User> bqRows = p.apply(read);
+
+    User a = new User();
+    a.setName("a");
+    User b = new User();
+    b.setName("b");
+    User c = new User();
+    c.setName("c");
+    User d = new User();
+    d.setName("d");
+
+    PAssert.that(bqRows).containsInAnyOrder(ImmutableList.of(a, b, c, d));
+
+    p.run();
+  }
+
   @Test
   public void testBuildSourceDisplayDataTable() {
     String tableSpec = "project:dataset.tableid";
@@ -558,7 +678,7 @@ public class BigQueryIOReadTest implements Serializable {
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
         BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+            .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -607,7 +727,7 @@ public class BigQueryIOReadTest implements Serializable {
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
         BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+            .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
@@ -645,7 +765,7 @@ public class BigQueryIOReadTest implements Serializable {
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
         BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+            .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
@@ -676,7 +796,7 @@ public class BigQueryIOReadTest implements Serializable {
                 null,
                 null,
                 null)
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+            .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
 
     fakeJobService.expectDryRunQuery(
         bqOptions.getProject(),
@@ -752,7 +872,7 @@ public class BigQueryIOReadTest implements Serializable {
                 null,
                 null,
                 null)
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+            .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
 
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
 
@@ -818,7 +938,7 @@ public class BigQueryIOReadTest implements Serializable {
                 null,
                 null,
                 null)
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+            .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
 
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());