You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/28 17:53:06 UTC

[2/4] beam git commit: [BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read

[BEAM-2677] AvroIO.parseGenericRecords - schemaless AvroIO.read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ebd00411
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ebd00411
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ebd00411

Branch: refs/heads/master
Commit: ebd004119c387787d0e0fcd0487e1b2754c7dbc5
Parents: 62c922b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jul 24 15:07:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 10:25:07 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 199 ++++++++++++++++++-
 .../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++----
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  89 ++++++---
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  30 ++-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |   4 +-
 5 files changed, 406 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 018b84f..27c9073 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -35,7 +35,9 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
@@ -53,13 +55,16 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 
 /**
  * {@link PTransform}s for reading and writing Avro files.
  *
- * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using
- * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively, if
- * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}.
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify
+ * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are
+ * themselves in a {@link PCollection}, apply {@link #readAll}.
  *
  * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
  *
@@ -70,6 +75,12 @@ import org.apache.beam.sdk.values.PDone;
  * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
  * #readAllGenericRecords}.
  *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
+ * #parseAllGenericRecords}.
+ *
  * <p>For example:
  *
  * <pre>{@code
@@ -84,12 +95,20 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<GenericRecord> records =
  *     p.apply(AvroIO.readGenericRecords(schema)
  *                .from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * PCollection<Foo> records =
+ *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
  * }</pre>
  *
  * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small
- * number of files.
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link
+ * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease
+ * performance if the filepattern matches only a small number of files.
  *
  * <p>Reading from a {@link PCollection} of filepatterns:
  *
@@ -101,6 +120,8 @@ import org.apache.beam.sdk.values.PDone;
  *     filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
  * PCollection<GenericRecord> genericRecords =
  *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ *     filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
  * }</pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
@@ -208,6 +229,29 @@ public class AvroIO {
   }
 
   /**
+   * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+   * custom type.
+   */
+  public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setParseFn(parseFn)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+   * input {@link PCollection}.
+   */
+  public static <T> ParseAll<T> parseAllGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+        .build();
+  }
+
+  /**
    * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
    * pattern).
    */
@@ -387,6 +431,149 @@ public class AvroIO {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /** Implementation of {@link #parseGenericRecords}. */
+  @AutoValue
+  public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable abstract ValueProvider<String> getFilepattern();
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+    @Nullable abstract Coder<T> getCoder();
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+      abstract Builder<T> setCoder(Coder<T> coder);
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
+
+      abstract Parse<T> build();
+    }
+
+    /** Reads from the given filename or filepattern. */
+    public Parse<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #from(String)}. */
+    public Parse<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets a coder for the result of the parse function. */
+    public Parse<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** Like {@link Read#withHintMatchesManyFiles()}. */
+    public Parse<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+      if (getHintMatchesManyFiles()) {
+        return input
+            .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+            .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder()));
+      }
+      return input.apply(
+          org.apache.beam.sdk.io.Read.from(
+              AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
+    }
+
+    private static <T> Coder<T> inferCoder(
+        @Nullable Coder<T> explicitCoder,
+        SerializableFunction<GenericRecord, T> parseFn,
+        CoderRegistry coderRegistry) {
+      if (explicitCoder != null) {
+        return explicitCoder;
+      }
+      // If a coder was not specified explicitly, infer it from parse fn.
+      TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(parseFn);
+      String message =
+          "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().";
+      checkArgument(descriptor != null, message);
+      try {
+        return coderRegistry.getCoder(descriptor);
+      } catch (CannotProvideCoderException e) {
+        throw new IllegalArgumentException(message, e);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #parseAllGenericRecords}. */
+  @AutoValue
+  public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+    @Nullable abstract Coder<T> getCoder();
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+      abstract Builder<T> setCoder(Coder<T> coder);
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ParseAll<T> build();
+    }
+
+    /** Specifies the coder for the result of the {@code parseFn}. */
+    public ParseAll<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @VisibleForTesting
+    ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      final Coder<T> coder =
+          Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+      SerializableFunction<String, FileBasedSource<T>> createSource =
+          new SerializableFunction<String, FileBasedSource<T>>() {
+            @Override
+            public FileBasedSource<T> apply(String input) {
+              return AvroSource.from(input).withParseFn(getParseFn(), coder);
+            }
+          };
+      return input
+          .apply(
+              "Parse all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
+                  getDesiredBundleSizeBytes(),
+                  createSource))
+          .setCoder(coder);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {

http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index a98d870..d277503 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -27,8 +28,10 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
 import java.io.ObjectStreamException;
 import java.io.PushbackInputStream;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
@@ -53,10 +56,12 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
@@ -130,19 +135,84 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   // The default sync interval is 64k.
   private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
 
-  // The type of the records contained in the file.
-  private final Class<T> type;
+  // Use cases of AvroSource are:
+  // 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
+  // 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
+  // 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
+  //    and converting them to type T.
+  //                     |    Case 1     |    Case 2   |     Case 3    |
+  // type                | GenericRecord |     Foo     | GenericRecord |
+  // readerSchemaString  |    non-null   |   non-null  |     null      |
+  // parseFn             |      null     |     null    |   non-null    |
+  // outputCoder         |      null     |     null    |   non-null    |
+  private static class Mode<T> implements Serializable {
+    private final Class<?> type;
+
+    // The JSON schema used to decode records.
+    @Nullable
+    private String readerSchemaString;
+
+    @Nullable
+    private final SerializableFunction<GenericRecord, T> parseFn;
+
+    @Nullable
+    private final Coder<T> outputCoder;
+
+    private Mode(
+        Class<?> type,
+        @Nullable String readerSchemaString,
+        @Nullable SerializableFunction<GenericRecord, T> parseFn,
+        @Nullable Coder<T> outputCoder) {
+      this.type = type;
+      this.readerSchemaString = internSchemaString(readerSchemaString);
+      this.parseFn = parseFn;
+      this.outputCoder = outputCoder;
+    }
 
-  // The JSON schema used to decode records.
-  @Nullable
-  private final String readerSchemaString;
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
+      is.defaultReadObject();
+      readerSchemaString = internSchemaString(readerSchemaString);
+    }
+
+    private Coder<T> getOutputCoder() {
+      if (parseFn == null) {
+        return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString));
+      } else {
+        return outputCoder;
+      }
+    }
+
+    private void validate() {
+      if (parseFn == null) {
+        checkArgument(
+            readerSchemaString != null,
+            "schema must be specified using withSchema() when not using a parse fn");
+      }
+    }
+  }
+
+  private static Mode<GenericRecord> readGenericRecordsWithSchema(String schema) {
+    return new Mode<>(GenericRecord.class, schema, null, null);
+  }
+  private static <T> Mode<T> readGeneratedClasses(Class<T> clazz) {
+    return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null);
+  }
+  private static <T> Mode<T> parseGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn, Coder<T> outputCoder) {
+    return new Mode<>(GenericRecord.class, null, parseFn, outputCoder);
+  }
+
+  private final Mode<T> mode;
 
   /**
-   * Reads from the given file name or pattern ("glob"). The returned source can be further
+   * Reads from the given file name or pattern ("glob"). The returned source needs to be further
    * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.
    */
   public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
-    return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class);
+    return new AvroSource<>(
+        fileNameOrPattern,
+        DEFAULT_MIN_BUNDLE_SIZE,
+        readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
   }
 
   /** Like {@link #from(ValueProvider)}. */
@@ -152,23 +222,40 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
+    checkNotNull(schema, "schema");
     return new AvroSource<>(
-        getFileOrPatternSpecProvider(), getMinBundleSize(), schema, GenericRecord.class);
+        getFileOrPatternSpecProvider(),
+        getMinBundleSize(),
+        readGenericRecordsWithSchema(schema));
   }
 
   /** Like {@link #withSchema(String)}. */
   public AvroSource<GenericRecord> withSchema(Schema schema) {
-    return new AvroSource<>(
-        getFileOrPatternSpecProvider(), getMinBundleSize(), schema.toString(), GenericRecord.class);
+    checkNotNull(schema, "schema");
+    return withSchema(schema.toString());
   }
 
   /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
+    checkNotNull(clazz, "clazz");
+    return new AvroSource<>(
+        getFileOrPatternSpecProvider(),
+        getMinBundleSize(),
+        readGeneratedClasses(clazz));
+  }
+
+  /**
+   * Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type
+   * using the given {@code parseFn} and encoded using the given coder.
+   */
+  public <X> AvroSource<X> withParseFn(
+      SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
+    checkNotNull(parseFn, "parseFn");
+    checkNotNull(parseFn, "coder");
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
         getMinBundleSize(),
-        ReflectData.get().getSchema(clazz).toString(),
-        clazz);
+        parseGenericRecords(parseFn, coder));
   }
 
   /**
@@ -176,19 +263,16 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<>(
-        getFileOrPatternSpecProvider(), minBundleSize, readerSchemaString, type);
+    return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode);
   }
 
   /** Constructor for FILEPATTERN mode. */
   private AvroSource(
       ValueProvider<String> fileNameOrPattern,
       long minBundleSize,
-      String readerSchemaString,
-      Class<T> type) {
+      Mode<T> mode) {
     super(fileNameOrPattern, minBundleSize);
-    this.readerSchemaString = internSchemaString(readerSchemaString);
-    this.type = type;
+    this.mode = mode;
   }
 
   /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
@@ -197,18 +281,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       long minBundleSize,
       long startOffset,
       long endOffset,
-      String readerSchemaString,
-      Class<T> type) {
+      Mode<T> mode) {
     super(metadata, minBundleSize, startOffset, endOffset);
-    this.readerSchemaString = internSchemaString(readerSchemaString);
-    this.type = type;
+    this.mode = mode;
   }
 
   @Override
   public void validate() {
-    // AvroSource objects do not need to be configured with more than a file pattern. Overridden to
-    // make this explicit.
     super.validate();
+    mode.validate();
   }
 
   /**
@@ -225,7 +306,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
   @Override
   public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
-    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readerSchemaString, type);
+    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode);
   }
 
   @Override
@@ -234,14 +315,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   }
 
   @Override
-  public AvroCoder<T> getDefaultOutputCoder() {
-    return AvroCoder.of(type, internOrParseSchemaString(readerSchemaString));
+  public Coder<T> getDefaultOutputCoder() {
+    return mode.getOutputCoder();
   }
 
   @VisibleForTesting
   @Nullable
   String getReaderSchemaString() {
-    return readerSchemaString;
+    return mode.readerSchemaString;
   }
 
   /** Avro file metadata. */
@@ -380,15 +461,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     switch (getMode()) {
       case SINGLE_FILE_OR_SUBRANGE:
         return new AvroSource<>(
-            getSingleFileMetadata(),
-            getMinBundleSize(),
-            getStartOffset(),
-            getEndOffset(),
-            readerSchemaString,
-            type);
+            getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
       case FILEPATTERN:
-        return new AvroSource<>(
-            getFileOrPatternSpecProvider(), getMinBundleSize(), readerSchemaString, type);
+        return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode);
         default:
           throw new InvalidObjectException(
               String.format("Unknown mode %s for AvroSource %s", getMode(), this));
@@ -402,6 +477,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    */
   @Experimental(Experimental.Kind.SOURCE_SINK)
   static class AvroBlock<T> extends Block<T> {
+    private final Mode<T> mode;
+
     // The number of records in the block.
     private final long numRecords;
 
@@ -412,7 +489,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     private long currentRecordIndex = 0;
 
     // A DatumReader to read records from the block.
-    private final DatumReader<T> reader;
+    private final DatumReader<?> reader;
 
     // A BinaryDecoder used by the reader to decode records.
     private final BinaryDecoder decoder;
@@ -455,19 +532,19 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     AvroBlock(
         byte[] data,
         long numRecords,
-        Class<? extends T> type,
-        String readerSchemaString,
+        Mode<T> mode,
         String writerSchemaString,
         String codec)
         throws IOException {
+      this.mode = mode;
       this.numRecords = numRecords;
       checkNotNull(writerSchemaString, "writerSchemaString");
       Schema writerSchema = internOrParseSchemaString(writerSchemaString);
       Schema readerSchema =
           internOrParseSchemaString(
-              MoreObjects.firstNonNull(readerSchemaString, writerSchemaString));
+              MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString));
       this.reader =
-          (type == GenericRecord.class)
+          (mode.type == GenericRecord.class)
               ? new GenericDatumReader<T>(writerSchema, readerSchema)
               : new ReflectDatumReader<T>(writerSchema, readerSchema);
       this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
@@ -483,7 +560,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       if (currentRecordIndex >= numRecords) {
         return false;
       }
-      currentRecord = reader.read(null, decoder);
+      Object record = reader.read(null, decoder);
+      currentRecord =
+          (mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record);
       currentRecordIndex++;
       return true;
     }
@@ -585,8 +664,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
           new AvroBlock<>(
               data,
               numRecords,
-              getCurrentSource().type,
-              getCurrentSource().readerSchemaString,
+              getCurrentSource().mode,
               metadata.getSchemaString(),
               metadata.getCodec());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 90cd824..154ff5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -114,9 +115,9 @@ public class AvroIOTest {
 
     public GenericClass() {}
 
-    public GenericClass(int intValue, String stringValue) {
-      this.intField = intValue;
-      this.stringField = stringValue;
+    public GenericClass(int intField, String stringField) {
+      this.intField = intField;
+      this.stringField = stringField;
     }
 
     @Override
@@ -142,9 +143,18 @@ public class AvroIOTest {
     }
   }
 
+  private static class ParseGenericClass
+      implements SerializableFunction<GenericRecord, GenericClass> {
+    @Override
+    public GenericClass apply(GenericRecord input) {
+      return new GenericClass(
+          (int) input.get("intField"), input.get("stringField").toString());
+    }
+  }
+
   @Test
   @Category(NeedsRunner.class)
-  public void testAvroIOWriteAndReadASingleFile() throws Throwable {
+  public void testAvroIOWriteAndReadAndParseASingleFile() throws Throwable {
     List<GenericClass> values =
         ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -153,23 +163,45 @@ public class AvroIOTest {
         .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
     writePipeline.run().waitUntilFinish();
 
-    // Test the same data via read(), read().withHintMatchesManyFiles(), and readAll()
+    // Test the same data using all versions of read().
+    PCollection<String> path =
+        readPipeline.apply("Create path", Create.of(outputFile.getAbsolutePath()));
     PAssert.that(
-            readPipeline.apply(
-                "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+        readPipeline.apply(
+            "Read", AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
         .containsInAnyOrder(values);
     PAssert.that(
-            readPipeline.apply(
-                "Read withHintMatchesManyFiles",
-                AvroIO.read(GenericClass.class)
-                    .from(outputFile.getAbsolutePath())
-                    .withHintMatchesManyFiles()))
+        readPipeline.apply(
+            "Read withHintMatchesManyFiles",
+            AvroIO.read(GenericClass.class)
+                .from(outputFile.getAbsolutePath())
+                .withHintMatchesManyFiles()))
         .containsInAnyOrder(values);
     PAssert.that(
-            "ReadAll",
-            readPipeline
-                .apply(Create.of(outputFile.getAbsolutePath()))
-                .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        path.apply(
+            "ReadAll", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(values);
+    PAssert.that(
+        readPipeline.apply(
+            "Parse",
+            AvroIO.parseGenericRecords(new ParseGenericClass())
+                .from(outputFile.getAbsolutePath())
+                .withCoder(AvroCoder.of(GenericClass.class))))
+        .containsInAnyOrder(values);
+    PAssert.that(
+        readPipeline.apply(
+            "Parse withHintMatchesManyFiles",
+            AvroIO.parseGenericRecords(new ParseGenericClass())
+                .from(outputFile.getAbsolutePath())
+                .withCoder(AvroCoder.of(GenericClass.class))
+                .withHintMatchesManyFiles()))
+        .containsInAnyOrder(values);
+    PAssert.that(
+        path.apply(
+            "ParseAll",
+            AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                .withCoder(AvroCoder.of(GenericClass.class))
+                .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(values);
 
     readPipeline.run();
@@ -200,7 +232,7 @@ public class AvroIOTest {
                 .withNumShards(3));
     writePipeline.run().waitUntilFinish();
 
-    // Test both read() and readAll()
+    // Test read(), readAll(), and parseAllGenericRecords().
     PAssert.that(
             readPipeline.apply(
                 "Read first",
@@ -213,15 +245,22 @@ public class AvroIOTest {
                 AvroIO.read(GenericClass.class)
                     .from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
         .containsInAnyOrder(secondValues);
+    PCollection<String> paths =
+        readPipeline.apply(
+            "Create paths",
+            Create.of(
+                tmpFolder.getRoot().getAbsolutePath() + "/first*",
+                tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+    PAssert.that(
+            paths.apply(
+                "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
     PAssert.that(
-            readPipeline
-                .apply(
-                    "Create paths",
-                    Create.of(
-                        tmpFolder.getRoot().getAbsolutePath() + "/first*",
-                        tmpFolder.getRoot().getAbsolutePath() + "/second*"))
-                .apply(
-                    "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+            paths.apply(
+                "Parse all",
+                AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                    .withCoder(AvroCoder.of(GenericClass.class))
+                    .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
 
     readPipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index bf2ac95..714e029 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.hamcrest.Matchers;
@@ -407,11 +408,6 @@ public class AvroSourceTest {
     source = AvroSource.from(filename).withSchema(schemaString);
     records = SourceTestUtils.readFromSource(source, null);
     assertEqualsWithGeneric(expected, records);
-
-    // Create a source with no schema
-    source = AvroSource.from(filename);
-    records = SourceTestUtils.readFromSource(source, null);
-    assertEqualsWithGeneric(expected, records);
   }
 
   @Test
@@ -449,6 +445,30 @@ public class AvroSourceTest {
     assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
   }
 
+  @Test
+  public void testParseFn() throws Exception {
+    List<Bird> expected = createRandomRecords(100);
+    String filename = generateTestFile("tmp.avro", expected, SyncBehavior.SYNC_DEFAULT, 0,
+        AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
+
+    AvroSource<Bird> source =
+        AvroSource.from(filename)
+            .withParseFn(
+                new SerializableFunction<GenericRecord, Bird>() {
+                  @Override
+                  public Bird apply(GenericRecord input) {
+                    return new Bird(
+                        (long) input.get("number"),
+                        input.get("species").toString(),
+                        input.get("quality").toString(),
+                        (long) input.get("quantity"));
+                  }
+                },
+                AvroCoder.of(Bird.class));
+    List<Bird> actual = SourceTestUtils.readFromSource(source, null);
+    assertThat(actual, containsInAnyOrder(expected.toArray()));
+  }
+
   private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {
     assertEquals(expected.size(), actual.size());
     for (int i = 0; i < expected.size(); i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ebd00411/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 2b1eafe..6c118a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -183,8 +183,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
 
     List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
     for (ResourceId file : files) {
-      avroSources.add(new TransformingSource<>(
-          AvroSource.from(file.toString()), function, getDefaultOutputCoder()));
+      avroSources.add(
+          AvroSource.from(file.toString()).withParseFn(function, getDefaultOutputCoder()));
     }
     return ImmutableList.copyOf(avroSources);
   }