You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/21 17:50:24 UTC

[GitHub] [beam] aromanenko-dev opened a new pull request, #24294: Create an Avro extension for Java SDK

aromanenko-dev opened a new pull request, #24294:
URL: https://github.com/apache/beam/pull/24294

   Create a dedicated extension for Java SDK to support Avro based on Avro-related code from `sdks/java/core`.
   
   Closes #24293
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1365250668

   @mosche Thanks for review, I believe I addressed all your comments, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054589320


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -0,0 +1,1341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.schemas.utils;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import net.bytebuddy.implementation.bytecode.Duplication;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import net.bytebuddy.implementation.bytecode.TypeCreation;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.AvroRecordSchema;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForSetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversion;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+import org.apache.beam.sdk.schemas.utils.POJOUtils;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
+import org.apache.beam.sdk.schemas.utils.StaticSchemaInference;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Days;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/** Utils to convert AVRO records to Beam rows. */

Review Comment:
   Good question... I don't remember why it's disappeared, I'll fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1365144162

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054424906


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroSink.java:
##########
@@ -0,0 +1,151 @@
+/*

Review Comment:
   Looks like `syncInterval` was removed? Why that?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/DynamicAvroDestinations.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link org.apache.beam.sdk.io.AvroIO}. In
+ * addition to dynamic file destinations, this allows specifying other AVRO properties (schema,
+ * metadata, codec, datum writer) per destination.
+ */
+public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT>

Review Comment:
   That doesn't seem to exist in core, how comes?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -0,0 +1,2026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * {@link PTransform}s for reading and writing Avro files.
+ *
+ * <h2>Reading Avro files</h2>
+ *
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@link #read}, using {@link Read#from} to specify the filename or
+ * filepattern to read from. If the filepatterns to be read are themselves in a {@link PCollection}
+ * you can use {@link FileIO} to match them and {@link AvroIO#readFiles} to read them. If the schema
+ * is unknown at pipeline construction time, use {@link #parseGenericRecords} or {@link
+ * #parseFilesGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these transforms.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
+ *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the
+ * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
+ * AvroIO#readFiles(Class)} allow streaming of new files matching the filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link
+ * AvroIO#readFiles(Class)} allows them in case the filepattern contains a glob wildcard character.
+ * Use {@link Read#withEmptyMatchTreatment} or {@link
+ * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link AvroIO#readFiles(Class)}
+ * to configure this behavior.
+ *
+ * <h3>Reading records of a known schema</h3>
+ *
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
+ * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
+ * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link FileIO} matching
+ * plus {@link #readFilesGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // Read Avro-generated classes from files on GCS
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * // Read GenericRecord's of the given schema from files on GCS
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records =
+ *     p.apply(AvroIO.readGenericRecords(schema)
+ *                .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO}
+ * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<Foo> records =
+ *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
+ * }</pre>
+ *
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.readFiles(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
+ * }</pre>
+ *
+ * <h3>Streaming new files matching a filepattern</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ *     .read(AvroAutoGenClass.class)
+ *     .from("gs://my_bucket/path/to/records-*.avro")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h3>Inferring Beam schemas from Avro files</h3>
+ *
+ * <p>If you want to use SQL or schema based operations on an Avro-based PCollection, you must
+ * configure the read transform to infer the Beam schema and automatically setup the Beam related
+ * coders by doing:
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
+ * }</pre>
+ *
+ * <h3>Inferring Beam schemas from Avro PCollections</h3>
+ *
+ * <p>If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as
+ * the output of another PTransform, you may be interested on making your PCollection schema-aware
+ * so you can use the Schema-based APIs or Beam's SqlTransform.
+ *
+ * <p>If you are using Avro specific records (generated classes from an Avro schema), you can
+ * register a schema provider for the specific Avro class to make any PCollection of these objects
+ * schema-aware.
+ *
+ * <pre>{@code
+ * pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
+ * }</pre>
+ *
+ * You can also manually set an Avro-backed Schema coder for a PCollection using {@link
+ * AvroUtils#schemaCoder(Class, Schema)} to make it schema-aware.
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records = ...
+ * AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder();
+ * records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
+ * }</pre>
+ *
+ * <p>If you are using GenericRecords you may need to set a specific Beam schema coder for each
+ * PCollection to match their internal Avro schema.
+ *
+ * <pre>{@code
+ * org.apache.avro.Schema avroSchema = ...
+ * PCollection<GenericRecord> records = ...
+ * records.setCoder(AvroUtils.schemaCoder(avroSchema));
+ * }</pre>
+ *
+ * <h2>Writing Avro files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link Write}, using {@code
+ * AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
+ * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
+ * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link
+ * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this
+ * default write filename policy using {@link Write#to(FilenamePolicy)} to specify a custom file
+ * naming policy.
+ *
+ * <p>By default, {@link Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be changed or overridden
+ * using {@link Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
+ *
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
+ * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * // A simple Write to a local file (only runs locally):
+ * PCollection<AvroAutoGenClass> records = ...;
+ * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
+ *
+ * // A Write to a sharded GCS file (runs locally and using remote execution):
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records = ...;
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
+ *     .to("gs://my_bucket/path/to/numbers")
+ *     .withSuffix(".avro"));
+ * }</pre>
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link Write#withWindowedWrites()} will
+ * cause windowing and triggering to be preserved. When producing windowed writes with a streaming
+ * runner that supports triggers, the number of output shards must be set explicitly using {@link
+ * Write#withNumShards(int)}; some runners may set this for you to a runner-chosen value, so you may
+ * need not set it yourself. A {@link FilenamePolicy} must be set, and unique windows and triggers
+ * must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ *     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ *   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ *   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ *     this.userToSchemaMap = userToSchemaMap;
+ *   }
+ *   public GenericRecord formatRecord(UserEvent record) {
+ *     return formatUserRecord(record, getSchema(record.getUserId()));
+ *   }
+ *   public Schema getSchema(Integer userId) {
+ *     return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ *   }
+ *   public Integer getDestination(UserEvent record) {
+ *     return record.getUserId();
+ *   }
+ *   public Integer getDefaultDestination() {
+ *     return 0;
+ *   }
+ *   public FilenamePolicy getFilenamePolicy(Integer userId) {
+ *     return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ *     + userId + "/events"));
+ *   }
+ *   public List<PCollectionView<?>> getSideInputs() {
+ *     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ *   }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
+ *     "ComputePerUserSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ *     .to(new UserDynamicAvroDestinations(userToSchemaMap)));
+ * }</pre>
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class AvroIO {
+  /**
+   * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+   *
+   * <p>The schema must be specified using one of the {@code withSchema} functions.
+   */
+  public static <T> Read<T> read(Class<T> recordClass) {
+    return new AutoValue_AvroIO_Read.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each file in a {@link PCollection} of {@link ReadableFile},
+   * returned by {@link FileIO#readMatches}.
+   *
+   * <p>You can read {@link GenericRecord} by using {@code #readFiles(GenericRecord.class)} or
+   * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a String.
+   */
+  public static <T> ReadFiles<T> readFiles(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each filepattern in the input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAll} using {@link FileIO} matching
+   *     plus {@link #readFiles(Class)}. This is the preferred method to make composition explicit.
+   *     {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /** Reads Avro file(s) containing records of the specified schema. */
+  public static Read<GenericRecord> readGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(Schema)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(Schema)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
+   * JSON-encoded string.
+   */
+  public static Read<GenericRecord> readGenericRecords(String schema) {
+    return readGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /** Like {@link #readGenericRecords(String)}, but for {@link ReadableFile} collections. */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(String schema) {
+    return readFilesGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(String)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(String)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+    return readAllGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+   * custom type.
+   */
+  public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setParseFn(parseFn)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each {@link ReadableFile} in
+   * the input {@link PCollection}.
+   */
+  public static <T> ParseFiles<T> parseFilesGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseFiles.Builder<T>()
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+   * input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link
+   *     #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} matching plus {@link
+   *     #parseFilesGenericRecords(SerializableFunction)} ()}. This is the preferred method to make
+   *     composition explicit. {@link ParseAll} will not receive upgrades and will be removed in a
+   *     future version of Beam.
+   */
+  @Deprecated
+  public static <T> ParseAll<T> parseAllGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
+   * pattern).
+   */
+  public static <T> Write<T> write(Class<T> recordClass) {
+    return new Write<>(
+        AvroIO.<T, T>defaultWriteBuilder()
+            .setGenericRecords(false)
+            .setSchema(ReflectData.get().getSchema(recordClass))
+            .build());
+  }
+
+  /** Writes Avro records of the specified schema. */
+  public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+    return new Write<>(
+        AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+            .setGenericRecords(true)
+            .setSchema(schema)
+            .build());
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * record of type OutputT.
+   *
+   * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+   * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+   * that will be written to the file must be specified. If using a custom {@link
+   * DynamicAvroDestinations} object this is done using {@link
+   * DynamicAvroDestinations#formatRecord}, otherwise the {@link TypedWrite#withFormatFunction} can
+   * be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+   * DynamicAvroDestinations} object, set via {@link Write#to(DynamicAvroDestinations)} to examine
+   * the custom type when choosing a destination.
+   *
+   * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+   * instead.
+   */
+  public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> writeCustomType() {
+    return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  }
+
+  /**
+   * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+   * {@link GenericRecord}. A schema must be specified either in {@link
+   * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+   * TypedWrite#withSchema(Schema)}.
+   */
+  public static <UserT> TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords() {
+    return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
+  }
+
+  /**
+   * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_TypedWrite.Builder<UserT, Void, OutputT>()
+        .setFilenameSuffix(null)
+        .setShardTemplate(null)
+        .setNumShards(0)
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .setMetadata(ImmutableMap.of())
+        .setWindowedWrites(false)
+        .setNoSpilling(false);
+  }
+
+  @Experimental(Kind.SCHEMAS)
+  private static <T> PCollection<T> setBeamSchema(
+      PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) {
+    return pc.setCoder(AvroUtils.schemaCoder(clazz, schema));
+  }
+
+  /**
+   * 64MB is a reasonable value that allows to amortize the cost of opening files, but is not so
+   * large as to exhaust a typical runner's maximum amount of output per ProcessElement call.
+   */
+  private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
+
+  /** Implementation of {@link #read} and {@link #readGenericRecords}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Reads from the given filename or filepattern.
+     *
+     * <p>If it is known that the filepattern will match a very large number of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
+     */
+    public Read<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) {
+      return toBuilder().setMatchConfiguration(matchConfiguration).build();
+    }
+
+    /** Configures whether or not a filepattern matching no files is allowed. */
+    public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * Continuously watches for new files matching the filepattern, polling it at the given
+     * interval, until the given termination condition is reached. The returned {@link PCollection}
+     * is unbounded. If {@code matchUpdatedFiles} is set, also watches for files with timestamp
+     * change.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, matchUpdatedFiles));
+    }
+
+    /**
+     * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code
+     * matchUpdatedFiles=false}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+     * performance for this case, but it may worsen performance if the filepattern matches only a
+     * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public Read<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      checkNotNull(getSchema(), "schema");
+
+      if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
+        PCollection<T> read =
+            input.apply(
+                "Read",
+                org.apache.beam.sdk.io.Read.from(
+                    createSource(
+                        getFilepattern(),
+                        getMatchConfiguration().getEmptyMatchTreatment(),
+                        getRecordClass(),
+                        getSchema(),
+                        null)));
+        return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+      }
+
+      // All other cases go through FileIO + ReadFiles
+      ReadFiles<T> readFiles =
+          (getRecordClass() == GenericRecord.class)
+              ? (ReadFiles<T>) readFilesGenericRecords(getSchema())
+              : readFiles(getRecordClass());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ReadFiles", readFiles);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> org.apache.beam.sdk.io.AvroSource<T> createSource(

Review Comment:
   Wrong types



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -0,0 +1,2026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * {@link PTransform}s for reading and writing Avro files.
+ *
+ * <h2>Reading Avro files</h2>
+ *
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@link #read}, using {@link Read#from} to specify the filename or
+ * filepattern to read from. If the filepatterns to be read are themselves in a {@link PCollection}
+ * you can use {@link FileIO} to match them and {@link AvroIO#readFiles} to read them. If the schema
+ * is unknown at pipeline construction time, use {@link #parseGenericRecords} or {@link
+ * #parseFilesGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these transforms.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
+ *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the
+ * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
+ * AvroIO#readFiles(Class)} allow streaming of new files matching the filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link
+ * AvroIO#readFiles(Class)} allows them in case the filepattern contains a glob wildcard character.
+ * Use {@link Read#withEmptyMatchTreatment} or {@link
+ * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link AvroIO#readFiles(Class)}
+ * to configure this behavior.
+ *
+ * <h3>Reading records of a known schema</h3>
+ *
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
+ * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
+ * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link FileIO} matching
+ * plus {@link #readFilesGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // Read Avro-generated classes from files on GCS
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * // Read GenericRecord's of the given schema from files on GCS
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records =
+ *     p.apply(AvroIO.readGenericRecords(schema)
+ *                .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO}
+ * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<Foo> records =
+ *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
+ * }</pre>
+ *
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.readFiles(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
+ * }</pre>
+ *
+ * <h3>Streaming new files matching a filepattern</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ *     .read(AvroAutoGenClass.class)
+ *     .from("gs://my_bucket/path/to/records-*.avro")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h3>Inferring Beam schemas from Avro files</h3>
+ *
+ * <p>If you want to use SQL or schema based operations on an Avro-based PCollection, you must
+ * configure the read transform to infer the Beam schema and automatically setup the Beam related
+ * coders by doing:
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
+ * }</pre>
+ *
+ * <h3>Inferring Beam schemas from Avro PCollections</h3>
+ *
+ * <p>If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as
+ * the output of another PTransform, you may be interested on making your PCollection schema-aware
+ * so you can use the Schema-based APIs or Beam's SqlTransform.
+ *
+ * <p>If you are using Avro specific records (generated classes from an Avro schema), you can
+ * register a schema provider for the specific Avro class to make any PCollection of these objects
+ * schema-aware.
+ *
+ * <pre>{@code
+ * pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
+ * }</pre>
+ *
+ * You can also manually set an Avro-backed Schema coder for a PCollection using {@link
+ * AvroUtils#schemaCoder(Class, Schema)} to make it schema-aware.
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records = ...
+ * AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder();
+ * records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
+ * }</pre>
+ *
+ * <p>If you are using GenericRecords you may need to set a specific Beam schema coder for each
+ * PCollection to match their internal Avro schema.
+ *
+ * <pre>{@code
+ * org.apache.avro.Schema avroSchema = ...
+ * PCollection<GenericRecord> records = ...
+ * records.setCoder(AvroUtils.schemaCoder(avroSchema));
+ * }</pre>
+ *
+ * <h2>Writing Avro files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link Write}, using {@code
+ * AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
+ * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
+ * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link
+ * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this
+ * default write filename policy using {@link Write#to(FilenamePolicy)} to specify a custom file
+ * naming policy.
+ *
+ * <p>By default, {@link Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be changed or overridden
+ * using {@link Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
+ *
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
+ * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * // A simple Write to a local file (only runs locally):
+ * PCollection<AvroAutoGenClass> records = ...;
+ * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
+ *
+ * // A Write to a sharded GCS file (runs locally and using remote execution):
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records = ...;
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
+ *     .to("gs://my_bucket/path/to/numbers")
+ *     .withSuffix(".avro"));
+ * }</pre>
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link Write#withWindowedWrites()} will
+ * cause windowing and triggering to be preserved. When producing windowed writes with a streaming
+ * runner that supports triggers, the number of output shards must be set explicitly using {@link
+ * Write#withNumShards(int)}; some runners may set this for you to a runner-chosen value, so you may
+ * need not set it yourself. A {@link FilenamePolicy} must be set, and unique windows and triggers
+ * must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ *     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ *   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ *   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ *     this.userToSchemaMap = userToSchemaMap;
+ *   }
+ *   public GenericRecord formatRecord(UserEvent record) {
+ *     return formatUserRecord(record, getSchema(record.getUserId()));
+ *   }
+ *   public Schema getSchema(Integer userId) {
+ *     return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ *   }
+ *   public Integer getDestination(UserEvent record) {
+ *     return record.getUserId();
+ *   }
+ *   public Integer getDefaultDestination() {
+ *     return 0;
+ *   }
+ *   public FilenamePolicy getFilenamePolicy(Integer userId) {
+ *     return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ *     + userId + "/events"));
+ *   }
+ *   public List<PCollectionView<?>> getSideInputs() {
+ *     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ *   }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
+ *     "ComputePerUserSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ *     .to(new UserDynamicAvroDestinations(userToSchemaMap)));
+ * }</pre>
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class AvroIO {
+  /**
+   * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+   *
+   * <p>The schema must be specified using one of the {@code withSchema} functions.
+   */
+  public static <T> Read<T> read(Class<T> recordClass) {
+    return new AutoValue_AvroIO_Read.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each file in a {@link PCollection} of {@link ReadableFile},
+   * returned by {@link FileIO#readMatches}.
+   *
+   * <p>You can read {@link GenericRecord} by using {@code #readFiles(GenericRecord.class)} or
+   * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a String.
+   */
+  public static <T> ReadFiles<T> readFiles(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each filepattern in the input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAll} using {@link FileIO} matching
+   *     plus {@link #readFiles(Class)}. This is the preferred method to make composition explicit.
+   *     {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /** Reads Avro file(s) containing records of the specified schema. */
+  public static Read<GenericRecord> readGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(Schema)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(Schema)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
+   * JSON-encoded string.
+   */
+  public static Read<GenericRecord> readGenericRecords(String schema) {
+    return readGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /** Like {@link #readGenericRecords(String)}, but for {@link ReadableFile} collections. */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(String schema) {
+    return readFilesGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(String)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(String)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+    return readAllGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+   * custom type.
+   */
+  public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setParseFn(parseFn)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each {@link ReadableFile} in
+   * the input {@link PCollection}.
+   */
+  public static <T> ParseFiles<T> parseFilesGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseFiles.Builder<T>()
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+   * input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link
+   *     #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} matching plus {@link
+   *     #parseFilesGenericRecords(SerializableFunction)} ()}. This is the preferred method to make
+   *     composition explicit. {@link ParseAll} will not receive upgrades and will be removed in a
+   *     future version of Beam.
+   */
+  @Deprecated
+  public static <T> ParseAll<T> parseAllGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
+   * pattern).
+   */
+  public static <T> Write<T> write(Class<T> recordClass) {
+    return new Write<>(
+        AvroIO.<T, T>defaultWriteBuilder()
+            .setGenericRecords(false)
+            .setSchema(ReflectData.get().getSchema(recordClass))
+            .build());
+  }
+
+  /** Writes Avro records of the specified schema. */
+  public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+    return new Write<>(
+        AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+            .setGenericRecords(true)
+            .setSchema(schema)
+            .build());
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * record of type OutputT.
+   *
+   * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+   * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+   * that will be written to the file must be specified. If using a custom {@link
+   * DynamicAvroDestinations} object this is done using {@link
+   * DynamicAvroDestinations#formatRecord}, otherwise the {@link TypedWrite#withFormatFunction} can
+   * be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+   * DynamicAvroDestinations} object, set via {@link Write#to(DynamicAvroDestinations)} to examine
+   * the custom type when choosing a destination.
+   *
+   * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+   * instead.
+   */
+  public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> writeCustomType() {
+    return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  }
+
+  /**
+   * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+   * {@link GenericRecord}. A schema must be specified either in {@link
+   * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+   * TypedWrite#withSchema(Schema)}.
+   */
+  public static <UserT> TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords() {
+    return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
+  }
+
+  /**
+   * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_TypedWrite.Builder<UserT, Void, OutputT>()
+        .setFilenameSuffix(null)
+        .setShardTemplate(null)
+        .setNumShards(0)
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .setMetadata(ImmutableMap.of())
+        .setWindowedWrites(false)
+        .setNoSpilling(false);
+  }
+
+  @Experimental(Kind.SCHEMAS)
+  private static <T> PCollection<T> setBeamSchema(
+      PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) {
+    return pc.setCoder(AvroUtils.schemaCoder(clazz, schema));
+  }
+
+  /**
+   * 64MB is a reasonable value that allows to amortize the cost of opening files, but is not so
+   * large as to exhaust a typical runner's maximum amount of output per ProcessElement call.
+   */
+  private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
+
+  /** Implementation of {@link #read} and {@link #readGenericRecords}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Reads from the given filename or filepattern.
+     *
+     * <p>If it is known that the filepattern will match a very large number of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
+     */
+    public Read<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) {
+      return toBuilder().setMatchConfiguration(matchConfiguration).build();
+    }
+
+    /** Configures whether or not a filepattern matching no files is allowed. */
+    public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * Continuously watches for new files matching the filepattern, polling it at the given
+     * interval, until the given termination condition is reached. The returned {@link PCollection}
+     * is unbounded. If {@code matchUpdatedFiles} is set, also watches for files with timestamp
+     * change.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, matchUpdatedFiles));
+    }
+
+    /**
+     * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code
+     * matchUpdatedFiles=false}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+     * performance for this case, but it may worsen performance if the filepattern matches only a
+     * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public Read<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      checkNotNull(getSchema(), "schema");
+
+      if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
+        PCollection<T> read =
+            input.apply(
+                "Read",
+                org.apache.beam.sdk.io.Read.from(
+                    createSource(
+                        getFilepattern(),
+                        getMatchConfiguration().getEmptyMatchTreatment(),
+                        getRecordClass(),
+                        getSchema(),
+                        null)));
+        return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+      }
+
+      // All other cases go through FileIO + ReadFiles
+      ReadFiles<T> readFiles =
+          (getRecordClass() == GenericRecord.class)
+              ? (ReadFiles<T>) readFilesGenericRecords(getSchema())
+              : readFiles(getRecordClass());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ReadFiles", readFiles);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> org.apache.beam.sdk.io.AvroSource<T> createSource(
+        ValueProvider<String> filepattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        Class<T> recordClass,
+        Schema schema,
+        org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T> readerFactory) {
+      org.apache.beam.sdk.io.AvroSource<?> source =
+          org.apache.beam.sdk.io.AvroSource.from(filepattern)
+              .withEmptyMatchTreatment(emptyMatchTreatment);
+
+      if (readerFactory != null) {
+        source = source.withDatumReaderFactory(readerFactory);
+      }
+      return recordClass == GenericRecord.class
+          ? (org.apache.beam.sdk.io.AvroSource<T>) source.withSchema(schema)
+          : source.withSchema(recordClass);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #readFiles}. */
+  @AutoValue
+  public abstract static class ReadFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getUsesReshuffle();
+
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T>
+        getDatumReaderFactory();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setDatumReaderFactory(
+          org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory);
+
+      abstract ReadFiles<T> build();
+    }
+
+    @VisibleForTesting
+    ReadFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    /** Specifies if a Reshuffle should run before file reads occur. */
+    @Experimental(Kind.FILESYSTEM)
+    public ReadFiles<T> withUsesReshuffle(boolean usesReshuffle) {
+      return toBuilder().setUsesReshuffle(usesReshuffle).build();
+    }
+
+    /** Specifies if exceptions should be logged only for streaming pipelines. */
+    @Experimental(Kind.FILESYSTEM)
+    public ReadFiles<T> withFileExceptionHandler(
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
+      return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public ReadFiles<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    public ReadFiles<T> withDatumReaderFactory(
+        org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory) {
+      return toBuilder().setDatumReaderFactory(factory).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<ReadableFile> input) {
+      checkNotNull(getSchema(), "schema");
+      PCollection<T> read =
+          input.apply(
+              "Read all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  getDesiredBundleSizeBytes(),
+                  new CreateSourceFn<>(
+                      getRecordClass(), getSchema().toString(), getDatumReaderFactory()),
+                  AvroCoder.of(getRecordClass(), getSchema()),
+                  getUsesReshuffle(),
+                  getFileExceptionHandler()));
+      return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(
+              DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"));
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Implementation of {@link #readAll}.
+   *
+   * @deprecated See {@link #readAll(Class)} for details.
+   */
+  @Deprecated
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract ReadAll<T> build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public ReadAll<T> withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles}. */
+    public ReadAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @VisibleForTesting
+    ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public ReadAll<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      checkNotNull(getSchema(), "schema");
+      PCollection<T> read =
+          input
+              .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+              .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+              .apply(readFiles(getRecordClass()));
+      return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  private static class CreateSourceFn<T>
+      implements SerializableFunction<String, FileBasedSource<T>> {
+    private final Class<T> recordClass;
+    private final Supplier<Schema> schemaSupplier;
+    private final org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> readerFactory;
+
+    CreateSourceFn(
+        Class<T> recordClass,
+        String jsonSchema,
+        org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> readerFactory) {
+      this.recordClass = recordClass;
+      this.schemaSupplier =
+          Suppliers.memoize(
+              Suppliers.compose(new JsonToSchema(), Suppliers.ofInstance(jsonSchema)));
+      this.readerFactory = readerFactory;
+    }
+
+    @Override
+    public FileBasedSource<T> apply(String input) {
+      return Read.createSource(
+          StaticValueProvider.of(input),
+          EmptyMatchTreatment.DISALLOW,
+          recordClass,
+          schemaSupplier.get(),
+          readerFactory);
+    }
+
+    private static class JsonToSchema implements Function<String, Schema>, Serializable {
+      @Override
+      public Schema apply(String input) {
+        return new Schema.Parser().parse(input);
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #parseGenericRecords}. */
+  @AutoValue
+  public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
+
+      abstract Parse<T> build();
+    }
+
+    /** Reads from the given filename or filepattern. */
+    public Parse<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #from(String)}. */
+    public Parse<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Parse<T> withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles}. */
+    public Parse<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    /** Sets a coder for the result of the parse function. */
+    public Parse<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** Like {@link Read#withHintMatchesManyFiles()}. */
+    public Parse<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+
+      if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
+        return input.apply(
+            org.apache.beam.sdk.io.Read.from(
+                org.apache.beam.sdk.io.AvroSource.from(getFilepattern())
+                    .withParseFn(getParseFn(), coder)));
+      }
+
+      // All other cases go through FileIO + ParseFilesGenericRecords.
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ParseFiles", parseFilesGenericRecords(getParseFn()).withCoder(coder));
+    }
+
+    private static <T> Coder<T> inferCoder(
+        @Nullable Coder<T> explicitCoder,
+        SerializableFunction<GenericRecord, T> parseFn,
+        CoderRegistry coderRegistry) {
+      if (explicitCoder != null) {
+        return explicitCoder;
+      }
+      // If a coder was not specified explicitly, infer it from parse fn.
+      try {
+        return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn));
+      } catch (CannotProvideCoderException e) {
+        throw new IllegalArgumentException(
+            "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
+            e);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #parseFilesGenericRecords}. */
+  @AutoValue
+  public abstract static class ParseFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract boolean getUsesReshuffle();
+
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ParseFiles<T> build();
+    }
+
+    /** Specifies the coder for the result of the {@code parseFn}. */
+    public ParseFiles<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** Specifies if a Reshuffle should run before file reads occur. */
+    @Experimental(Kind.FILESYSTEM)
+    public ParseFiles<T> withUsesReshuffle(boolean usesReshuffle) {
+      return toBuilder().setUsesReshuffle(usesReshuffle).build();
+    }
+
+    /** Specifies if exceptions should be logged only for streaming pipelines. */
+    @Experimental(Kind.FILESYSTEM)
+    public ParseFiles<T> withFileExceptionHandler(
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
+      return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+    }
+
+    @VisibleForTesting
+    ParseFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<ReadableFile> input) {
+      final Coder<T> coder =
+          Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+      final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
+      final SerializableFunction<String, FileBasedSource<T>> createSource =
+          new CreateParseSourceFn<>(parseFn, coder);
+      return input.apply(
+          "Parse Files via FileBasedSource",
+          new ReadAllViaFileBasedSource<>(
+              getDesiredBundleSizeBytes(),
+              createSource,
+              coder,
+              getUsesReshuffle(),
+              getFileExceptionHandler()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+    }
+
+    private static class CreateParseSourceFn<T>
+        implements SerializableFunction<String, FileBasedSource<T>> {
+      private final SerializableFunction<GenericRecord, T> parseFn;
+      private final Coder<T> coder;
+
+      CreateParseSourceFn(SerializableFunction<GenericRecord, T> parseFn, Coder<T> coder) {
+        this.parseFn = parseFn;
+        this.coder = coder;
+      }
+
+      @Override
+      public FileBasedSource<T> apply(String input) {
+        return AvroSource.from(input).withParseFn(parseFn, coder);
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Implementation of {@link #parseAllGenericRecords}.
+   *
+   * @deprecated See {@link #parseAllGenericRecords(SerializableFunction)} for details.
+   */
+  @Deprecated
+  @AutoValue
+  public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ParseAll<T> build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public ParseAll<T> withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)}. */
+    public ParseAll<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, matchUpdatedFiles));
+    }
+
+    /** Like {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */
+    public ParseAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /** Specifies the coder for the result of the {@code parseFn}. */
+    public ParseAll<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @VisibleForTesting
+    ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      return input
+          .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply(
+              "Parse all via FileBasedSource",
+              parseFilesGenericRecords(getParseFn()).withCoder(getCoder()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class TypedWrite<UserT, DestinationT, OutputT>

Review Comment:
   Why was `syncInterval` (see `AvroSink`) removed?



##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/AvroSchemaTest.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.schemas;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
+import org.apache.avro.reflect.AvroSchema;
+import org.apache.avro.util.Utf8;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.transforms.Group;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.LocalDate;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Tests for AVRO schema classes. */
+public class AvroSchemaTest {
+  /** A test POJO that corresponds to our AVRO schema. */
+  public static class AvroSubPojo {
+    @AvroName("BOOL_NON_NULLABLE")
+    public boolean boolNonNullable;
+
+    @AvroName("int")
+    @org.apache.avro.reflect.Nullable
+    public Integer anInt;
+
+    public AvroSubPojo(boolean boolNonNullable, Integer anInt) {
+      this.boolNonNullable = boolNonNullable;
+      this.anInt = anInt;
+    }
+
+    public AvroSubPojo() {}
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof AvroSubPojo)) {
+        return false;
+      }
+      AvroSubPojo that = (AvroSubPojo) o;
+      return boolNonNullable == that.boolNonNullable && Objects.equals(anInt, that.anInt);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(boolNonNullable, anInt);
+    }
+
+    @Override
+    public String toString() {
+      return "AvroSubPojo{" + "boolNonNullable=" + boolNonNullable + ", anInt=" + anInt + '}';
+    }
+  }
+
+  /** A test POJO that corresponds to our AVRO schema. */
+  public static class AvroPojo {
+    public @AvroName("bool_non_nullable") boolean boolNonNullable;
+
+    @org.apache.avro.reflect.Nullable
+    public @AvroName("int") Integer anInt;
+
+    @org.apache.avro.reflect.Nullable
+    public @AvroName("long") Long aLong;
+
+    @AvroName("float")
+    @org.apache.avro.reflect.Nullable
+    public Float aFloat;
+
+    @AvroName("double")
+    @org.apache.avro.reflect.Nullable
+    public Double aDouble;
+
+    @org.apache.avro.reflect.Nullable public String string;
+    @org.apache.avro.reflect.Nullable public ByteBuffer bytes;
+
+    @AvroSchema("{\"type\": \"fixed\", \"size\": 4, \"name\": \"fixed4\"}")
+    public byte[] fixed;
+
+    @AvroSchema("{\"type\": \"int\", \"logicalType\": \"date\"}")
+    public LocalDate date;
+
+    @AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
+    public DateTime timestampMillis;
+
+    @AvroSchema("{\"name\": \"TestEnum\", \"type\": \"enum\", \"symbols\": [\"abc\",\"cde\"] }")
+    public TestEnum testEnum;
+
+    @org.apache.avro.reflect.Nullable public AvroSubPojo row;
+    @org.apache.avro.reflect.Nullable public List<AvroSubPojo> array;
+    @org.apache.avro.reflect.Nullable public Map<String, AvroSubPojo> map;
+    @AvroIgnore String extraField;
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof AvroPojo)) {
+        return false;
+      }
+      AvroPojo avroPojo = (AvroPojo) o;
+      return boolNonNullable == avroPojo.boolNonNullable
+          && Objects.equals(anInt, avroPojo.anInt)
+          && Objects.equals(aLong, avroPojo.aLong)
+          && Objects.equals(aFloat, avroPojo.aFloat)
+          && Objects.equals(aDouble, avroPojo.aDouble)
+          && Objects.equals(string, avroPojo.string)
+          && Objects.equals(bytes, avroPojo.bytes)
+          && Arrays.equals(fixed, avroPojo.fixed)
+          && Objects.equals(date, avroPojo.date)
+          && Objects.equals(timestampMillis, avroPojo.timestampMillis)
+          && Objects.equals(testEnum, avroPojo.testEnum)
+          && Objects.equals(row, avroPojo.row)
+          && Objects.equals(array, avroPojo.array)
+          && Objects.equals(map, avroPojo.map);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          boolNonNullable,
+          anInt,
+          aLong,
+          aFloat,
+          aDouble,
+          string,
+          bytes,
+          Arrays.hashCode(fixed),
+          date,
+          timestampMillis,
+          testEnum,
+          row,
+          array,
+          map);
+    }
+
+    public AvroPojo(
+        boolean boolNonNullable,
+        int anInt,
+        long aLong,
+        float aFloat,
+        double aDouble,
+        String string,
+        ByteBuffer bytes,
+        byte[] fixed,
+        LocalDate date,
+        DateTime timestampMillis,
+        TestEnum testEnum,
+        AvroSubPojo row,
+        List<AvroSubPojo> array,
+        Map<String, AvroSubPojo> map) {
+      this.boolNonNullable = boolNonNullable;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aFloat = aFloat;
+      this.aDouble = aDouble;
+      this.string = string;
+      this.bytes = bytes;
+      this.fixed = fixed;
+      this.date = date;
+      this.timestampMillis = timestampMillis;
+      this.testEnum = testEnum;
+      this.row = row;
+      this.array = array;
+      this.map = map;
+      this.extraField = "";
+    }
+
+    public AvroPojo() {}
+
+    @Override
+    public String toString() {
+      return "AvroPojo{"
+          + "boolNonNullable="
+          + boolNonNullable
+          + ", anInt="
+          + anInt
+          + ", aLong="
+          + aLong
+          + ", aFloat="
+          + aFloat
+          + ", aDouble="
+          + aDouble
+          + ", string='"
+          + string
+          + '\''
+          + ", bytes="
+          + bytes
+          + ", fixed="
+          + Arrays.toString(fixed)
+          + ", date="
+          + date
+          + ", timestampMillis="
+          + timestampMillis
+          + ", testEnum="
+          + testEnum
+          + ", row="
+          + row
+          + ", array="
+          + array
+          + ", map="
+          + map
+          + ", extraField='"
+          + extraField
+          + '\''
+          + '}';
+    }
+  }
+
+  private static final Schema SUBSCHEMA =
+      Schema.builder()
+          .addField("BOOL_NON_NULLABLE", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .build();
+  private static final FieldType SUB_TYPE = FieldType.row(SUBSCHEMA).withNullable(true);
+
+  private static final EnumerationType TEST_ENUM_TYPE = EnumerationType.create("abc", "cde");
+
+  private static final Schema SCHEMA =
+      Schema.builder()
+          .addField("bool_non_nullable", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .addNullableField("long", FieldType.INT64)
+          .addNullableField("float", FieldType.FLOAT)
+          .addNullableField("double", FieldType.DOUBLE)
+          .addNullableField("string", FieldType.STRING)
+          .addNullableField("bytes", FieldType.BYTES)
+          .addField("fixed", FieldType.logicalType(FixedBytes.of(4)))
+          .addField("date", FieldType.DATETIME)
+          .addField("timestampMillis", FieldType.DATETIME)
+          .addField("TestEnum", FieldType.logicalType(TEST_ENUM_TYPE))
+          .addNullableField("row", SUB_TYPE)
+          .addNullableField("array", FieldType.array(SUB_TYPE))
+          .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE))
+          .build();
+
+  private static final Schema POJO_SCHEMA =
+      Schema.builder()
+          .addField("bool_non_nullable", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .addNullableField("long", FieldType.INT64)
+          .addNullableField("float", FieldType.FLOAT)
+          .addNullableField("double", FieldType.DOUBLE)
+          .addNullableField("string", FieldType.STRING)
+          .addNullableField("bytes", FieldType.BYTES)
+          .addField("fixed", FieldType.logicalType(FixedBytes.of(4)))
+          .addField("date", FieldType.DATETIME)
+          .addField("timestampMillis", FieldType.DATETIME)
+          .addField("testEnum", FieldType.logicalType(TEST_ENUM_TYPE))
+          .addNullableField("row", SUB_TYPE)
+          .addNullableField("array", FieldType.array(SUB_TYPE.withNullable(false)))
+          .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE.withNullable(false)))
+          .build();
+
+  private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
+  private static final DateTime DATE_TIME =
+      new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4);
+  private static final LocalDate DATE = new LocalDate(1979, 3, 14);
+  private static final TestAvroNested AVRO_NESTED_SPECIFIC_RECORD = new TestAvroNested(true, 42);
+  private static final TestAvro AVRO_SPECIFIC_RECORD =
+      new TestAvro(
+          true,
+          43,
+          44L,
+          (float) 44.1,
+          (double) 44.2,
+          "mystring",
+          ByteBuffer.wrap(BYTE_ARRAY),
+          new fixed4(BYTE_ARRAY),
+          DATE,
+          DATE_TIME,
+          TestEnum.abc,
+          AVRO_NESTED_SPECIFIC_RECORD,
+          ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, AVRO_NESTED_SPECIFIC_RECORD),
+          ImmutableMap.of("k1", AVRO_NESTED_SPECIFIC_RECORD, "k2", AVRO_NESTED_SPECIFIC_RECORD));
+  private static final GenericRecord AVRO_NESTED_GENERIC_RECORD =
+      new GenericRecordBuilder(TestAvroNested.SCHEMA$)
+          .set("BOOL_NON_NULLABLE", true)
+          .set("int", 42)
+          .build();
+  private static final GenericRecord AVRO_GENERIC_RECORD =
+      new GenericRecordBuilder(TestAvro.SCHEMA$)
+          .set("bool_non_nullable", true)
+          .set("int", 43)
+          .set("long", 44L)
+          .set("float", (float) 44.1)
+          .set("double", (double) 44.2)
+          .set("string", new Utf8("mystring"))
+          .set("bytes", ByteBuffer.wrap(BYTE_ARRAY))
+          .set(
+              "fixed",
+              GenericData.get()
+                  .createFixed(
+                      null, BYTE_ARRAY, org.apache.avro.Schema.createFixed("fixed4", "", "", 4)))
+          .set("date", (int) Days.daysBetween(new LocalDate(1970, 1, 1), DATE).getDays())
+          .set("timestampMillis", DATE_TIME.getMillis())
+          .set("TestEnum", TestEnum.abc)
+          .set("row", AVRO_NESTED_GENERIC_RECORD)
+          .set("array", ImmutableList.of(AVRO_NESTED_GENERIC_RECORD, AVRO_NESTED_GENERIC_RECORD))
+          .set(
+              "map",
+              ImmutableMap.of(
+                  new Utf8("k1"), AVRO_NESTED_GENERIC_RECORD,
+                  new Utf8("k2"), AVRO_NESTED_GENERIC_RECORD))
+          .build();
+
+  private static final Row NESTED_ROW = Row.withSchema(SUBSCHEMA).addValues(true, 42).build();
+  private static final Row ROW =
+      Row.withSchema(SCHEMA)
+          .addValues(
+              true,
+              43,
+              44L,
+              (float) 44.1,
+              (double) 44.2,
+              "mystring",
+              ByteBuffer.wrap(BYTE_ARRAY),
+              BYTE_ARRAY,
+              DATE.toDateTimeAtStartOfDay(DateTimeZone.UTC),
+              DATE_TIME,
+              TEST_ENUM_TYPE.valueOf("abc"),
+              NESTED_ROW,
+              ImmutableList.of(NESTED_ROW, NESTED_ROW),
+              ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW))
+          .build();
+
+  @Test
+  public void testSpecificRecordSchema() {
+    assertEquals(
+        SCHEMA,
+        new org.apache.beam.sdk.schemas.AvroRecordSchema()

Review Comment:
   Wrong type here and below



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -0,0 +1,1341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.schemas.utils;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import net.bytebuddy.implementation.bytecode.Duplication;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import net.bytebuddy.implementation.bytecode.TypeCreation;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.AvroRecordSchema;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForSetter;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversion;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+import org.apache.beam.sdk.schemas.utils.POJOUtils;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
+import org.apache.beam.sdk.schemas.utils.StaticSchemaInference;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Days;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/** Utils to convert AVRO records to Beam rows. */

Review Comment:
   Why removing the original Javadocs?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -0,0 +1,2026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * {@link PTransform}s for reading and writing Avro files.
+ *
+ * <h2>Reading Avro files</h2>
+ *
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@link #read}, using {@link Read#from} to specify the filename or
+ * filepattern to read from. If the filepatterns to be read are themselves in a {@link PCollection}
+ * you can use {@link FileIO} to match them and {@link AvroIO#readFiles} to read them. If the schema
+ * is unknown at pipeline construction time, use {@link #parseGenericRecords} or {@link
+ * #parseFilesGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these transforms.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
+ *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the
+ * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
+ * AvroIO#readFiles(Class)} allow streaming of new files matching the filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link
+ * AvroIO#readFiles(Class)} allows them in case the filepattern contains a glob wildcard character.
+ * Use {@link Read#withEmptyMatchTreatment} or {@link
+ * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link AvroIO#readFiles(Class)}
+ * to configure this behavior.
+ *
+ * <h3>Reading records of a known schema</h3>
+ *
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
+ * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
+ * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link FileIO} matching
+ * plus {@link #readFilesGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // Read Avro-generated classes from files on GCS
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * // Read GenericRecord's of the given schema from files on GCS
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records =
+ *     p.apply(AvroIO.readGenericRecords(schema)
+ *                .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO}
+ * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<Foo> records =
+ *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
+ * }</pre>
+ *
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.readFiles(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
+ * }</pre>
+ *
+ * <h3>Streaming new files matching a filepattern</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ *     .read(AvroAutoGenClass.class)
+ *     .from("gs://my_bucket/path/to/records-*.avro")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h3>Inferring Beam schemas from Avro files</h3>
+ *
+ * <p>If you want to use SQL or schema based operations on an Avro-based PCollection, you must
+ * configure the read transform to infer the Beam schema and automatically setup the Beam related
+ * coders by doing:
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
+ * }</pre>
+ *
+ * <h3>Inferring Beam schemas from Avro PCollections</h3>
+ *
+ * <p>If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as
+ * the output of another PTransform, you may be interested on making your PCollection schema-aware
+ * so you can use the Schema-based APIs or Beam's SqlTransform.
+ *
+ * <p>If you are using Avro specific records (generated classes from an Avro schema), you can
+ * register a schema provider for the specific Avro class to make any PCollection of these objects
+ * schema-aware.
+ *
+ * <pre>{@code
+ * pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
+ * }</pre>
+ *
+ * You can also manually set an Avro-backed Schema coder for a PCollection using {@link
+ * AvroUtils#schemaCoder(Class, Schema)} to make it schema-aware.
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records = ...
+ * AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder();
+ * records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
+ * }</pre>
+ *
+ * <p>If you are using GenericRecords you may need to set a specific Beam schema coder for each
+ * PCollection to match their internal Avro schema.
+ *
+ * <pre>{@code
+ * org.apache.avro.Schema avroSchema = ...
+ * PCollection<GenericRecord> records = ...
+ * records.setCoder(AvroUtils.schemaCoder(avroSchema));
+ * }</pre>
+ *
+ * <h2>Writing Avro files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link Write}, using {@code
+ * AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
+ * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
+ * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link
+ * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this
+ * default write filename policy using {@link Write#to(FilenamePolicy)} to specify a custom file
+ * naming policy.
+ *
+ * <p>By default, {@link Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be changed or overridden
+ * using {@link Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
+ *
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
+ * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * // A simple Write to a local file (only runs locally):
+ * PCollection<AvroAutoGenClass> records = ...;
+ * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
+ *
+ * // A Write to a sharded GCS file (runs locally and using remote execution):
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records = ...;
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
+ *     .to("gs://my_bucket/path/to/numbers")
+ *     .withSuffix(".avro"));
+ * }</pre>
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link Write#withWindowedWrites()} will
+ * cause windowing and triggering to be preserved. When producing windowed writes with a streaming
+ * runner that supports triggers, the number of output shards must be set explicitly using {@link
+ * Write#withNumShards(int)}; some runners may set this for you to a runner-chosen value, so you may
+ * need not set it yourself. A {@link FilenamePolicy} must be set, and unique windows and triggers
+ * must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ *     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ *   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ *   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ *     this.userToSchemaMap = userToSchemaMap;
+ *   }
+ *   public GenericRecord formatRecord(UserEvent record) {
+ *     return formatUserRecord(record, getSchema(record.getUserId()));
+ *   }
+ *   public Schema getSchema(Integer userId) {
+ *     return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ *   }
+ *   public Integer getDestination(UserEvent record) {
+ *     return record.getUserId();
+ *   }
+ *   public Integer getDefaultDestination() {
+ *     return 0;
+ *   }
+ *   public FilenamePolicy getFilenamePolicy(Integer userId) {
+ *     return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ *     + userId + "/events"));
+ *   }
+ *   public List<PCollectionView<?>> getSideInputs() {
+ *     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ *   }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
+ *     "ComputePerUserSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ *     .to(new UserDynamicAvroDestinations(userToSchemaMap)));
+ * }</pre>
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class AvroIO {
+  /**
+   * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+   *
+   * <p>The schema must be specified using one of the {@code withSchema} functions.
+   */
+  public static <T> Read<T> read(Class<T> recordClass) {
+    return new AutoValue_AvroIO_Read.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each file in a {@link PCollection} of {@link ReadableFile},
+   * returned by {@link FileIO#readMatches}.
+   *
+   * <p>You can read {@link GenericRecord} by using {@code #readFiles(GenericRecord.class)} or
+   * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a String.
+   */
+  public static <T> ReadFiles<T> readFiles(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each filepattern in the input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAll} using {@link FileIO} matching
+   *     plus {@link #readFiles(Class)}. This is the preferred method to make composition explicit.
+   *     {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /** Reads Avro file(s) containing records of the specified schema. */
+  public static Read<GenericRecord> readGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(Schema)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(Schema)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
+   * JSON-encoded string.
+   */
+  public static Read<GenericRecord> readGenericRecords(String schema) {
+    return readGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /** Like {@link #readGenericRecords(String)}, but for {@link ReadableFile} collections. */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(String schema) {
+    return readFilesGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(String)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(String)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+    return readAllGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+   * custom type.
+   */
+  public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setParseFn(parseFn)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each {@link ReadableFile} in
+   * the input {@link PCollection}.
+   */
+  public static <T> ParseFiles<T> parseFilesGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseFiles.Builder<T>()
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+   * input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link
+   *     #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} matching plus {@link
+   *     #parseFilesGenericRecords(SerializableFunction)} ()}. This is the preferred method to make
+   *     composition explicit. {@link ParseAll} will not receive upgrades and will be removed in a
+   *     future version of Beam.
+   */
+  @Deprecated
+  public static <T> ParseAll<T> parseAllGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
+   * pattern).
+   */
+  public static <T> Write<T> write(Class<T> recordClass) {
+    return new Write<>(
+        AvroIO.<T, T>defaultWriteBuilder()
+            .setGenericRecords(false)
+            .setSchema(ReflectData.get().getSchema(recordClass))
+            .build());
+  }
+
+  /** Writes Avro records of the specified schema. */
+  public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+    return new Write<>(
+        AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+            .setGenericRecords(true)
+            .setSchema(schema)
+            .build());
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * record of type OutputT.
+   *
+   * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+   * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+   * that will be written to the file must be specified. If using a custom {@link
+   * DynamicAvroDestinations} object this is done using {@link
+   * DynamicAvroDestinations#formatRecord}, otherwise the {@link TypedWrite#withFormatFunction} can
+   * be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+   * DynamicAvroDestinations} object, set via {@link Write#to(DynamicAvroDestinations)} to examine
+   * the custom type when choosing a destination.
+   *
+   * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+   * instead.
+   */
+  public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> writeCustomType() {
+    return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  }
+
+  /**
+   * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+   * {@link GenericRecord}. A schema must be specified either in {@link
+   * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+   * TypedWrite#withSchema(Schema)}.
+   */
+  public static <UserT> TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords() {
+    return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
+  }
+
+  /**
+   * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_TypedWrite.Builder<UserT, Void, OutputT>()
+        .setFilenameSuffix(null)
+        .setShardTemplate(null)
+        .setNumShards(0)
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .setMetadata(ImmutableMap.of())
+        .setWindowedWrites(false)
+        .setNoSpilling(false);
+  }
+
+  @Experimental(Kind.SCHEMAS)
+  private static <T> PCollection<T> setBeamSchema(
+      PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) {
+    return pc.setCoder(AvroUtils.schemaCoder(clazz, schema));
+  }
+
+  /**
+   * 64MB is a reasonable value that allows to amortize the cost of opening files, but is not so
+   * large as to exhaust a typical runner's maximum amount of output per ProcessElement call.
+   */
+  private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
+
+  /** Implementation of {@link #read} and {@link #readGenericRecords}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Reads from the given filename or filepattern.
+     *
+     * <p>If it is known that the filepattern will match a very large number of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
+     */
+    public Read<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) {
+      return toBuilder().setMatchConfiguration(matchConfiguration).build();
+    }
+
+    /** Configures whether or not a filepattern matching no files is allowed. */
+    public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * Continuously watches for new files matching the filepattern, polling it at the given
+     * interval, until the given termination condition is reached. The returned {@link PCollection}
+     * is unbounded. If {@code matchUpdatedFiles} is set, also watches for files with timestamp
+     * change.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, matchUpdatedFiles));
+    }
+
+    /**
+     * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code
+     * matchUpdatedFiles=false}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+     * performance for this case, but it may worsen performance if the filepattern matches only a
+     * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public Read<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      checkNotNull(getSchema(), "schema");
+
+      if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
+        PCollection<T> read =
+            input.apply(
+                "Read",
+                org.apache.beam.sdk.io.Read.from(
+                    createSource(
+                        getFilepattern(),
+                        getMatchConfiguration().getEmptyMatchTreatment(),
+                        getRecordClass(),
+                        getSchema(),
+                        null)));
+        return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+      }
+
+      // All other cases go through FileIO + ReadFiles
+      ReadFiles<T> readFiles =
+          (getRecordClass() == GenericRecord.class)
+              ? (ReadFiles<T>) readFilesGenericRecords(getSchema())
+              : readFiles(getRecordClass());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ReadFiles", readFiles);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> org.apache.beam.sdk.io.AvroSource<T> createSource(
+        ValueProvider<String> filepattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        Class<T> recordClass,
+        Schema schema,
+        org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T> readerFactory) {
+      org.apache.beam.sdk.io.AvroSource<?> source =
+          org.apache.beam.sdk.io.AvroSource.from(filepattern)
+              .withEmptyMatchTreatment(emptyMatchTreatment);
+
+      if (readerFactory != null) {
+        source = source.withDatumReaderFactory(readerFactory);
+      }
+      return recordClass == GenericRecord.class
+          ? (org.apache.beam.sdk.io.AvroSource<T>) source.withSchema(schema)
+          : source.withSchema(recordClass);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #readFiles}. */
+  @AutoValue
+  public abstract static class ReadFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getUsesReshuffle();
+
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T>
+        getDatumReaderFactory();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setDatumReaderFactory(
+          org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory);

Review Comment:
   Be careful, some types are the old ones from core



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054585075


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -0,0 +1,2026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * {@link PTransform}s for reading and writing Avro files.
+ *
+ * <h2>Reading Avro files</h2>
+ *
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@link #read}, using {@link Read#from} to specify the filename or
+ * filepattern to read from. If the filepatterns to be read are themselves in a {@link PCollection}
+ * you can use {@link FileIO} to match them and {@link AvroIO#readFiles} to read them. If the schema
+ * is unknown at pipeline construction time, use {@link #parseGenericRecords} or {@link
+ * #parseFilesGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these transforms.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
+ *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the
+ * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
+ * AvroIO#readFiles(Class)} allow streaming of new files matching the filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link
+ * AvroIO#readFiles(Class)} allows them in case the filepattern contains a glob wildcard character.
+ * Use {@link Read#withEmptyMatchTreatment} or {@link
+ * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link AvroIO#readFiles(Class)}
+ * to configure this behavior.
+ *
+ * <h3>Reading records of a known schema</h3>
+ *
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
+ * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
+ * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link FileIO} matching
+ * plus {@link #readFilesGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // Read Avro-generated classes from files on GCS
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * // Read GenericRecord's of the given schema from files on GCS
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records =
+ *     p.apply(AvroIO.readGenericRecords(schema)
+ *                .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO}
+ * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<Foo> records =
+ *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
+ * }</pre>
+ *
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.readFiles(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
+ * }</pre>
+ *
+ * <h3>Streaming new files matching a filepattern</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ *     .read(AvroAutoGenClass.class)
+ *     .from("gs://my_bucket/path/to/records-*.avro")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h3>Inferring Beam schemas from Avro files</h3>
+ *
+ * <p>If you want to use SQL or schema based operations on an Avro-based PCollection, you must
+ * configure the read transform to infer the Beam schema and automatically setup the Beam related
+ * coders by doing:
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
+ * }</pre>
+ *
+ * <h3>Inferring Beam schemas from Avro PCollections</h3>
+ *
+ * <p>If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as
+ * the output of another PTransform, you may be interested on making your PCollection schema-aware
+ * so you can use the Schema-based APIs or Beam's SqlTransform.
+ *
+ * <p>If you are using Avro specific records (generated classes from an Avro schema), you can
+ * register a schema provider for the specific Avro class to make any PCollection of these objects
+ * schema-aware.
+ *
+ * <pre>{@code
+ * pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
+ * }</pre>
+ *
+ * You can also manually set an Avro-backed Schema coder for a PCollection using {@link
+ * AvroUtils#schemaCoder(Class, Schema)} to make it schema-aware.
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records = ...
+ * AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder();
+ * records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
+ * }</pre>
+ *
+ * <p>If you are using GenericRecords you may need to set a specific Beam schema coder for each
+ * PCollection to match their internal Avro schema.
+ *
+ * <pre>{@code
+ * org.apache.avro.Schema avroSchema = ...
+ * PCollection<GenericRecord> records = ...
+ * records.setCoder(AvroUtils.schemaCoder(avroSchema));
+ * }</pre>
+ *
+ * <h2>Writing Avro files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link Write}, using {@code
+ * AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
+ * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
+ * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link
+ * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this
+ * default write filename policy using {@link Write#to(FilenamePolicy)} to specify a custom file
+ * naming policy.
+ *
+ * <p>By default, {@link Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be changed or overridden
+ * using {@link Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
+ *
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
+ * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * // A simple Write to a local file (only runs locally):
+ * PCollection<AvroAutoGenClass> records = ...;
+ * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
+ *
+ * // A Write to a sharded GCS file (runs locally and using remote execution):
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records = ...;
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
+ *     .to("gs://my_bucket/path/to/numbers")
+ *     .withSuffix(".avro"));
+ * }</pre>
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link Write#withWindowedWrites()} will
+ * cause windowing and triggering to be preserved. When producing windowed writes with a streaming
+ * runner that supports triggers, the number of output shards must be set explicitly using {@link
+ * Write#withNumShards(int)}; some runners may set this for you to a runner-chosen value, so you may
+ * need not set it yourself. A {@link FilenamePolicy} must be set, and unique windows and triggers
+ * must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ *     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ *   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ *   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ *     this.userToSchemaMap = userToSchemaMap;
+ *   }
+ *   public GenericRecord formatRecord(UserEvent record) {
+ *     return formatUserRecord(record, getSchema(record.getUserId()));
+ *   }
+ *   public Schema getSchema(Integer userId) {
+ *     return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ *   }
+ *   public Integer getDestination(UserEvent record) {
+ *     return record.getUserId();
+ *   }
+ *   public Integer getDefaultDestination() {
+ *     return 0;
+ *   }
+ *   public FilenamePolicy getFilenamePolicy(Integer userId) {
+ *     return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ *     + userId + "/events"));
+ *   }
+ *   public List<PCollectionView<?>> getSideInputs() {
+ *     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ *   }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
+ *     "ComputePerUserSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ *     .to(new UserDynamicAvroDestinations(userToSchemaMap)));
+ * }</pre>
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class AvroIO {
+  /**
+   * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+   *
+   * <p>The schema must be specified using one of the {@code withSchema} functions.
+   */
+  public static <T> Read<T> read(Class<T> recordClass) {
+    return new AutoValue_AvroIO_Read.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each file in a {@link PCollection} of {@link ReadableFile},
+   * returned by {@link FileIO#readMatches}.
+   *
+   * <p>You can read {@link GenericRecord} by using {@code #readFiles(GenericRecord.class)} or
+   * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a String.
+   */
+  public static <T> ReadFiles<T> readFiles(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each filepattern in the input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAll} using {@link FileIO} matching
+   *     plus {@link #readFiles(Class)}. This is the preferred method to make composition explicit.
+   *     {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /** Reads Avro file(s) containing records of the specified schema. */
+  public static Read<GenericRecord> readGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(Schema)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(Schema)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
+   * JSON-encoded string.
+   */
+  public static Read<GenericRecord> readGenericRecords(String schema) {
+    return readGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /** Like {@link #readGenericRecords(String)}, but for {@link ReadableFile} collections. */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(String schema) {
+    return readFilesGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(String)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(String)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+    return readAllGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+   * custom type.
+   */
+  public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setParseFn(parseFn)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each {@link ReadableFile} in
+   * the input {@link PCollection}.
+   */
+  public static <T> ParseFiles<T> parseFilesGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseFiles.Builder<T>()
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+   * input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link
+   *     #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} matching plus {@link
+   *     #parseFilesGenericRecords(SerializableFunction)} ()}. This is the preferred method to make
+   *     composition explicit. {@link ParseAll} will not receive upgrades and will be removed in a
+   *     future version of Beam.
+   */
+  @Deprecated
+  public static <T> ParseAll<T> parseAllGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
+   * pattern).
+   */
+  public static <T> Write<T> write(Class<T> recordClass) {
+    return new Write<>(
+        AvroIO.<T, T>defaultWriteBuilder()
+            .setGenericRecords(false)
+            .setSchema(ReflectData.get().getSchema(recordClass))
+            .build());
+  }
+
+  /** Writes Avro records of the specified schema. */
+  public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+    return new Write<>(
+        AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+            .setGenericRecords(true)
+            .setSchema(schema)
+            .build());
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * record of type OutputT.
+   *
+   * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+   * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+   * that will be written to the file must be specified. If using a custom {@link
+   * DynamicAvroDestinations} object this is done using {@link
+   * DynamicAvroDestinations#formatRecord}, otherwise the {@link TypedWrite#withFormatFunction} can
+   * be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+   * DynamicAvroDestinations} object, set via {@link Write#to(DynamicAvroDestinations)} to examine
+   * the custom type when choosing a destination.
+   *
+   * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+   * instead.
+   */
+  public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> writeCustomType() {
+    return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  }
+
+  /**
+   * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+   * {@link GenericRecord}. A schema must be specified either in {@link
+   * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+   * TypedWrite#withSchema(Schema)}.
+   */
+  public static <UserT> TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords() {
+    return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
+  }
+
+  /**
+   * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_TypedWrite.Builder<UserT, Void, OutputT>()
+        .setFilenameSuffix(null)
+        .setShardTemplate(null)
+        .setNumShards(0)
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .setMetadata(ImmutableMap.of())
+        .setWindowedWrites(false)
+        .setNoSpilling(false);
+  }
+
+  @Experimental(Kind.SCHEMAS)
+  private static <T> PCollection<T> setBeamSchema(
+      PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) {
+    return pc.setCoder(AvroUtils.schemaCoder(clazz, schema));
+  }
+
+  /**
+   * 64MB is a reasonable value that allows to amortize the cost of opening files, but is not so
+   * large as to exhaust a typical runner's maximum amount of output per ProcessElement call.
+   */
+  private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
+
+  /** Implementation of {@link #read} and {@link #readGenericRecords}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Reads from the given filename or filepattern.
+     *
+     * <p>If it is known that the filepattern will match a very large number of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
+     */
+    public Read<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) {
+      return toBuilder().setMatchConfiguration(matchConfiguration).build();
+    }
+
+    /** Configures whether or not a filepattern matching no files is allowed. */
+    public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * Continuously watches for new files matching the filepattern, polling it at the given
+     * interval, until the given termination condition is reached. The returned {@link PCollection}
+     * is unbounded. If {@code matchUpdatedFiles} is set, also watches for files with timestamp
+     * change.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, matchUpdatedFiles));
+    }
+
+    /**
+     * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code
+     * matchUpdatedFiles=false}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+     * performance for this case, but it may worsen performance if the filepattern matches only a
+     * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public Read<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      checkNotNull(getSchema(), "schema");
+
+      if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
+        PCollection<T> read =
+            input.apply(
+                "Read",
+                org.apache.beam.sdk.io.Read.from(
+                    createSource(
+                        getFilepattern(),
+                        getMatchConfiguration().getEmptyMatchTreatment(),
+                        getRecordClass(),
+                        getSchema(),
+                        null)));
+        return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+      }
+
+      // All other cases go through FileIO + ReadFiles
+      ReadFiles<T> readFiles =
+          (getRecordClass() == GenericRecord.class)
+              ? (ReadFiles<T>) readFilesGenericRecords(getSchema())
+              : readFiles(getRecordClass());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ReadFiles", readFiles);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> org.apache.beam.sdk.io.AvroSource<T> createSource(
+        ValueProvider<String> filepattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        Class<T> recordClass,
+        Schema schema,
+        org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T> readerFactory) {
+      org.apache.beam.sdk.io.AvroSource<?> source =
+          org.apache.beam.sdk.io.AvroSource.from(filepattern)
+              .withEmptyMatchTreatment(emptyMatchTreatment);
+
+      if (readerFactory != null) {
+        source = source.withDatumReaderFactory(readerFactory);
+      }
+      return recordClass == GenericRecord.class
+          ? (org.apache.beam.sdk.io.AvroSource<T>) source.withSchema(schema)
+          : source.withSchema(recordClass);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #readFiles}. */
+  @AutoValue
+  public abstract static class ReadFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getUsesReshuffle();
+
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T>
+        getDatumReaderFactory();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setDatumReaderFactory(
+          org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory);
+
+      abstract ReadFiles<T> build();
+    }
+
+    @VisibleForTesting
+    ReadFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    /** Specifies if a Reshuffle should run before file reads occur. */
+    @Experimental(Kind.FILESYSTEM)
+    public ReadFiles<T> withUsesReshuffle(boolean usesReshuffle) {
+      return toBuilder().setUsesReshuffle(usesReshuffle).build();
+    }
+
+    /** Specifies if exceptions should be logged only for streaming pipelines. */
+    @Experimental(Kind.FILESYSTEM)
+    public ReadFiles<T> withFileExceptionHandler(
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
+      return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public ReadFiles<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    public ReadFiles<T> withDatumReaderFactory(
+        org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory) {
+      return toBuilder().setDatumReaderFactory(factory).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<ReadableFile> input) {
+      checkNotNull(getSchema(), "schema");
+      PCollection<T> read =
+          input.apply(
+              "Read all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  getDesiredBundleSizeBytes(),
+                  new CreateSourceFn<>(
+                      getRecordClass(), getSchema().toString(), getDatumReaderFactory()),
+                  AvroCoder.of(getRecordClass(), getSchema()),
+                  getUsesReshuffle(),
+                  getFileExceptionHandler()));
+      return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(
+              DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"));
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Implementation of {@link #readAll}.
+   *
+   * @deprecated See {@link #readAll(Class)} for details.
+   */
+  @Deprecated
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract ReadAll<T> build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public ReadAll<T> withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles}. */
+    public ReadAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    @VisibleForTesting
+    ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public ReadAll<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      checkNotNull(getSchema(), "schema");
+      PCollection<T> read =
+          input
+              .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+              .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+              .apply(readFiles(getRecordClass()));
+      return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  private static class CreateSourceFn<T>
+      implements SerializableFunction<String, FileBasedSource<T>> {
+    private final Class<T> recordClass;
+    private final Supplier<Schema> schemaSupplier;
+    private final org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> readerFactory;
+
+    CreateSourceFn(
+        Class<T> recordClass,
+        String jsonSchema,
+        org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> readerFactory) {
+      this.recordClass = recordClass;
+      this.schemaSupplier =
+          Suppliers.memoize(
+              Suppliers.compose(new JsonToSchema(), Suppliers.ofInstance(jsonSchema)));
+      this.readerFactory = readerFactory;
+    }
+
+    @Override
+    public FileBasedSource<T> apply(String input) {
+      return Read.createSource(
+          StaticValueProvider.of(input),
+          EmptyMatchTreatment.DISALLOW,
+          recordClass,
+          schemaSupplier.get(),
+          readerFactory);
+    }
+
+    private static class JsonToSchema implements Function<String, Schema>, Serializable {
+      @Override
+      public Schema apply(String input) {
+        return new Schema.Parser().parse(input);
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #parseGenericRecords}. */
+  @AutoValue
+  public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
+
+      abstract Parse<T> build();
+    }
+
+    /** Reads from the given filename or filepattern. */
+    public Parse<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #from(String)}. */
+    public Parse<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Parse<T> withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles}. */
+    public Parse<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, terminationCondition));
+    }
+
+    /** Sets a coder for the result of the parse function. */
+    public Parse<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** Like {@link Read#withHintMatchesManyFiles()}. */
+    public Parse<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+
+      if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
+        return input.apply(
+            org.apache.beam.sdk.io.Read.from(
+                org.apache.beam.sdk.io.AvroSource.from(getFilepattern())
+                    .withParseFn(getParseFn(), coder)));
+      }
+
+      // All other cases go through FileIO + ParseFilesGenericRecords.
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ParseFiles", parseFilesGenericRecords(getParseFn()).withCoder(coder));
+    }
+
+    private static <T> Coder<T> inferCoder(
+        @Nullable Coder<T> explicitCoder,
+        SerializableFunction<GenericRecord, T> parseFn,
+        CoderRegistry coderRegistry) {
+      if (explicitCoder != null) {
+        return explicitCoder;
+      }
+      // If a coder was not specified explicitly, infer it from parse fn.
+      try {
+        return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn));
+      } catch (CannotProvideCoderException e) {
+        throw new IllegalArgumentException(
+            "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
+            e);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #parseFilesGenericRecords}. */
+  @AutoValue
+  public abstract static class ParseFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract boolean getUsesReshuffle();
+
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ParseFiles<T> build();
+    }
+
+    /** Specifies the coder for the result of the {@code parseFn}. */
+    public ParseFiles<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    /** Specifies if a Reshuffle should run before file reads occur. */
+    @Experimental(Kind.FILESYSTEM)
+    public ParseFiles<T> withUsesReshuffle(boolean usesReshuffle) {
+      return toBuilder().setUsesReshuffle(usesReshuffle).build();
+    }
+
+    /** Specifies if exceptions should be logged only for streaming pipelines. */
+    @Experimental(Kind.FILESYSTEM)
+    public ParseFiles<T> withFileExceptionHandler(
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
+      return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+    }
+
+    @VisibleForTesting
+    ParseFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<ReadableFile> input) {
+      final Coder<T> coder =
+          Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
+      final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
+      final SerializableFunction<String, FileBasedSource<T>> createSource =
+          new CreateParseSourceFn<>(parseFn, coder);
+      return input.apply(
+          "Parse Files via FileBasedSource",
+          new ReadAllViaFileBasedSource<>(
+              getDesiredBundleSizeBytes(),
+              createSource,
+              coder,
+              getUsesReshuffle(),
+              getFileExceptionHandler()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+    }
+
+    private static class CreateParseSourceFn<T>
+        implements SerializableFunction<String, FileBasedSource<T>> {
+      private final SerializableFunction<GenericRecord, T> parseFn;
+      private final Coder<T> coder;
+
+      CreateParseSourceFn(SerializableFunction<GenericRecord, T> parseFn, Coder<T> coder) {
+        this.parseFn = parseFn;
+        this.coder = coder;
+      }
+
+      @Override
+      public FileBasedSource<T> apply(String input) {
+        return AvroSource.from(input).withParseFn(parseFn, coder);
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Implementation of {@link #parseAllGenericRecords}.
+   *
+   * @deprecated See {@link #parseAllGenericRecords(SerializableFunction)} for details.
+   */
+  @Deprecated
+  @AutoValue
+  public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract SerializableFunction<GenericRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ParseAll<T> build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public ParseAll<T> withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)}. */
+    public ParseAll<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, matchUpdatedFiles));
+    }
+
+    /** Like {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */
+    public ParseAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /** Specifies the coder for the result of the {@code parseFn}. */
+    public ParseAll<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @VisibleForTesting
+    ParseAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      return input
+          .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply(
+              "Parse all via FileBasedSource",
+              parseFilesGenericRecords(getParseFn()).withCoder(getCoder()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #write}. */
+  @AutoValue
+  public abstract static class TypedWrite<UserT, DestinationT, OutputT>

Review Comment:
   I believe it was just added recently and it was not synced to the new class. I'll check the other classes and we have to do this before deprecation of "core" avro classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #24294: [WIP] Create an Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1322485141

   # [Codecov](https://codecov.io/gh/apache/beam/pull/24294?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#24294](https://codecov.io/gh/apache/beam/pull/24294?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9f2f387) into [master](https://codecov.io/gh/apache/beam/commit/e0e10b9e5432643b884c381d145e5924cc4ef193?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e0e10b9) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #24294      +/-   ##
   ==========================================
   - Coverage   73.46%   73.44%   -0.02%     
   ==========================================
     Files         709      709              
     Lines       95983    96077      +94     
   ==========================================
   + Hits        70514    70565      +51     
   - Misses      24152    24195      +43     
     Partials     1317     1317              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.16% <ø> (-0.05%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/24294?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/worker\_pool\_main.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvd29ya2VyX3Bvb2xfbWFpbi5weQ==) | `56.32% <0.00%> (-2.94%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.12% <0.00%> (-2.44%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.50% <0.00%> (-1.27%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/direct/executor.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZXhlY3V0b3IucHk=) | `96.46% <0.00%> (-0.55%)` | :arrow_down: |
   | [...apache\_beam/typehints/native\_type\_compatibility.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL25hdGl2ZV90eXBlX2NvbXBhdGliaWxpdHkucHk=) | `85.16% <0.00%> (-0.37%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `92.54% <0.00%> (-0.37%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/typehints.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3R5cGVoaW50cy5weQ==) | `93.05% <0.00%> (-0.33%)` | :arrow_down: |
   | [...beam/testing/load\_tests/load\_test\_metrics\_utils.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9sb2FkX3Rlc3RzL2xvYWRfdGVzdF9tZXRyaWNzX3V0aWxzLnB5) | `33.82% <0.00%> (-0.26%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.08% <0.00%> (-0.17%)` | :arrow_down: |
   | ... and [17 more](https://codecov.io/gh/apache/beam/pull/24294/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054590709


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroSink.java:
##########
@@ -0,0 +1,151 @@
+/*

Review Comment:
   It's beed added there #24454



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054390175


##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(

Review Comment:
   I'd suggest to set these until ready for consumption
   ```
   publish: false, 
   exportJavadoc: false
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1361314432

   > Closes https://github.com/apache/beam/issues/24293
   
   This PR doesn't close the ticket yet, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1055165450


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/DynamicAvroDestinations.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link org.apache.beam.sdk.io.AvroIO}. In
+ * addition to dynamic file destinations, this allows specifying other AVRO properties (schema,
+ * metadata, codec, datum writer) per destination.
+ */
+public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT>
+    extends DynamicDestinations<UserT, DestinationT, OutputT> {
+  /** Return an AVRO schema for a given destination. */
+  public abstract Schema getSchema(DestinationT destination);
+
+  /** Return AVRO file metadata for a given destination. */
+  public Map<String, Object> getMetadata(DestinationT destination) {
+    return ImmutableMap.of();
+  }
+
+  /** Return an AVRO codec for a given destination. */
+  public CodecFactory getCodec(DestinationT destination) {
+    return AvroIO.TypedWrite.DEFAULT_CODEC;
+  }
+
+  /**
+   * Return a {@link org.apache.beam.sdk.io.AvroSink.DatumWriterFactory} for a given destination. If

Review Comment:
   wrong type referenced here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1359694549

   @mosche 
   CC: @kennknowles @lukecwik @reuvenlax @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054588991


##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/AvroSchemaTest.java:
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.schemas;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
+import org.apache.avro.reflect.AvroSchema;
+import org.apache.avro.util.Utf8;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.transforms.Group;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.LocalDate;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Tests for AVRO schema classes. */
+public class AvroSchemaTest {
+  /** A test POJO that corresponds to our AVRO schema. */
+  public static class AvroSubPojo {
+    @AvroName("BOOL_NON_NULLABLE")
+    public boolean boolNonNullable;
+
+    @AvroName("int")
+    @org.apache.avro.reflect.Nullable
+    public Integer anInt;
+
+    public AvroSubPojo(boolean boolNonNullable, Integer anInt) {
+      this.boolNonNullable = boolNonNullable;
+      this.anInt = anInt;
+    }
+
+    public AvroSubPojo() {}
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof AvroSubPojo)) {
+        return false;
+      }
+      AvroSubPojo that = (AvroSubPojo) o;
+      return boolNonNullable == that.boolNonNullable && Objects.equals(anInt, that.anInt);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(boolNonNullable, anInt);
+    }
+
+    @Override
+    public String toString() {
+      return "AvroSubPojo{" + "boolNonNullable=" + boolNonNullable + ", anInt=" + anInt + '}';
+    }
+  }
+
+  /** A test POJO that corresponds to our AVRO schema. */
+  public static class AvroPojo {
+    public @AvroName("bool_non_nullable") boolean boolNonNullable;
+
+    @org.apache.avro.reflect.Nullable
+    public @AvroName("int") Integer anInt;
+
+    @org.apache.avro.reflect.Nullable
+    public @AvroName("long") Long aLong;
+
+    @AvroName("float")
+    @org.apache.avro.reflect.Nullable
+    public Float aFloat;
+
+    @AvroName("double")
+    @org.apache.avro.reflect.Nullable
+    public Double aDouble;
+
+    @org.apache.avro.reflect.Nullable public String string;
+    @org.apache.avro.reflect.Nullable public ByteBuffer bytes;
+
+    @AvroSchema("{\"type\": \"fixed\", \"size\": 4, \"name\": \"fixed4\"}")
+    public byte[] fixed;
+
+    @AvroSchema("{\"type\": \"int\", \"logicalType\": \"date\"}")
+    public LocalDate date;
+
+    @AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
+    public DateTime timestampMillis;
+
+    @AvroSchema("{\"name\": \"TestEnum\", \"type\": \"enum\", \"symbols\": [\"abc\",\"cde\"] }")
+    public TestEnum testEnum;
+
+    @org.apache.avro.reflect.Nullable public AvroSubPojo row;
+    @org.apache.avro.reflect.Nullable public List<AvroSubPojo> array;
+    @org.apache.avro.reflect.Nullable public Map<String, AvroSubPojo> map;
+    @AvroIgnore String extraField;
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof AvroPojo)) {
+        return false;
+      }
+      AvroPojo avroPojo = (AvroPojo) o;
+      return boolNonNullable == avroPojo.boolNonNullable
+          && Objects.equals(anInt, avroPojo.anInt)
+          && Objects.equals(aLong, avroPojo.aLong)
+          && Objects.equals(aFloat, avroPojo.aFloat)
+          && Objects.equals(aDouble, avroPojo.aDouble)
+          && Objects.equals(string, avroPojo.string)
+          && Objects.equals(bytes, avroPojo.bytes)
+          && Arrays.equals(fixed, avroPojo.fixed)
+          && Objects.equals(date, avroPojo.date)
+          && Objects.equals(timestampMillis, avroPojo.timestampMillis)
+          && Objects.equals(testEnum, avroPojo.testEnum)
+          && Objects.equals(row, avroPojo.row)
+          && Objects.equals(array, avroPojo.array)
+          && Objects.equals(map, avroPojo.map);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          boolNonNullable,
+          anInt,
+          aLong,
+          aFloat,
+          aDouble,
+          string,
+          bytes,
+          Arrays.hashCode(fixed),
+          date,
+          timestampMillis,
+          testEnum,
+          row,
+          array,
+          map);
+    }
+
+    public AvroPojo(
+        boolean boolNonNullable,
+        int anInt,
+        long aLong,
+        float aFloat,
+        double aDouble,
+        String string,
+        ByteBuffer bytes,
+        byte[] fixed,
+        LocalDate date,
+        DateTime timestampMillis,
+        TestEnum testEnum,
+        AvroSubPojo row,
+        List<AvroSubPojo> array,
+        Map<String, AvroSubPojo> map) {
+      this.boolNonNullable = boolNonNullable;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aFloat = aFloat;
+      this.aDouble = aDouble;
+      this.string = string;
+      this.bytes = bytes;
+      this.fixed = fixed;
+      this.date = date;
+      this.timestampMillis = timestampMillis;
+      this.testEnum = testEnum;
+      this.row = row;
+      this.array = array;
+      this.map = map;
+      this.extraField = "";
+    }
+
+    public AvroPojo() {}
+
+    @Override
+    public String toString() {
+      return "AvroPojo{"
+          + "boolNonNullable="
+          + boolNonNullable
+          + ", anInt="
+          + anInt
+          + ", aLong="
+          + aLong
+          + ", aFloat="
+          + aFloat
+          + ", aDouble="
+          + aDouble
+          + ", string='"
+          + string
+          + '\''
+          + ", bytes="
+          + bytes
+          + ", fixed="
+          + Arrays.toString(fixed)
+          + ", date="
+          + date
+          + ", timestampMillis="
+          + timestampMillis
+          + ", testEnum="
+          + testEnum
+          + ", row="
+          + row
+          + ", array="
+          + array
+          + ", map="
+          + map
+          + ", extraField='"
+          + extraField
+          + '\''
+          + '}';
+    }
+  }
+
+  private static final Schema SUBSCHEMA =
+      Schema.builder()
+          .addField("BOOL_NON_NULLABLE", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .build();
+  private static final FieldType SUB_TYPE = FieldType.row(SUBSCHEMA).withNullable(true);
+
+  private static final EnumerationType TEST_ENUM_TYPE = EnumerationType.create("abc", "cde");
+
+  private static final Schema SCHEMA =
+      Schema.builder()
+          .addField("bool_non_nullable", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .addNullableField("long", FieldType.INT64)
+          .addNullableField("float", FieldType.FLOAT)
+          .addNullableField("double", FieldType.DOUBLE)
+          .addNullableField("string", FieldType.STRING)
+          .addNullableField("bytes", FieldType.BYTES)
+          .addField("fixed", FieldType.logicalType(FixedBytes.of(4)))
+          .addField("date", FieldType.DATETIME)
+          .addField("timestampMillis", FieldType.DATETIME)
+          .addField("TestEnum", FieldType.logicalType(TEST_ENUM_TYPE))
+          .addNullableField("row", SUB_TYPE)
+          .addNullableField("array", FieldType.array(SUB_TYPE))
+          .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE))
+          .build();
+
+  private static final Schema POJO_SCHEMA =
+      Schema.builder()
+          .addField("bool_non_nullable", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .addNullableField("long", FieldType.INT64)
+          .addNullableField("float", FieldType.FLOAT)
+          .addNullableField("double", FieldType.DOUBLE)
+          .addNullableField("string", FieldType.STRING)
+          .addNullableField("bytes", FieldType.BYTES)
+          .addField("fixed", FieldType.logicalType(FixedBytes.of(4)))
+          .addField("date", FieldType.DATETIME)
+          .addField("timestampMillis", FieldType.DATETIME)
+          .addField("testEnum", FieldType.logicalType(TEST_ENUM_TYPE))
+          .addNullableField("row", SUB_TYPE)
+          .addNullableField("array", FieldType.array(SUB_TYPE.withNullable(false)))
+          .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE.withNullable(false)))
+          .build();
+
+  private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
+  private static final DateTime DATE_TIME =
+      new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4);
+  private static final LocalDate DATE = new LocalDate(1979, 3, 14);
+  private static final TestAvroNested AVRO_NESTED_SPECIFIC_RECORD = new TestAvroNested(true, 42);
+  private static final TestAvro AVRO_SPECIFIC_RECORD =
+      new TestAvro(
+          true,
+          43,
+          44L,
+          (float) 44.1,
+          (double) 44.2,
+          "mystring",
+          ByteBuffer.wrap(BYTE_ARRAY),
+          new fixed4(BYTE_ARRAY),
+          DATE,
+          DATE_TIME,
+          TestEnum.abc,
+          AVRO_NESTED_SPECIFIC_RECORD,
+          ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, AVRO_NESTED_SPECIFIC_RECORD),
+          ImmutableMap.of("k1", AVRO_NESTED_SPECIFIC_RECORD, "k2", AVRO_NESTED_SPECIFIC_RECORD));
+  private static final GenericRecord AVRO_NESTED_GENERIC_RECORD =
+      new GenericRecordBuilder(TestAvroNested.SCHEMA$)
+          .set("BOOL_NON_NULLABLE", true)
+          .set("int", 42)
+          .build();
+  private static final GenericRecord AVRO_GENERIC_RECORD =
+      new GenericRecordBuilder(TestAvro.SCHEMA$)
+          .set("bool_non_nullable", true)
+          .set("int", 43)
+          .set("long", 44L)
+          .set("float", (float) 44.1)
+          .set("double", (double) 44.2)
+          .set("string", new Utf8("mystring"))
+          .set("bytes", ByteBuffer.wrap(BYTE_ARRAY))
+          .set(
+              "fixed",
+              GenericData.get()
+                  .createFixed(
+                      null, BYTE_ARRAY, org.apache.avro.Schema.createFixed("fixed4", "", "", 4)))
+          .set("date", (int) Days.daysBetween(new LocalDate(1970, 1, 1), DATE).getDays())
+          .set("timestampMillis", DATE_TIME.getMillis())
+          .set("TestEnum", TestEnum.abc)
+          .set("row", AVRO_NESTED_GENERIC_RECORD)
+          .set("array", ImmutableList.of(AVRO_NESTED_GENERIC_RECORD, AVRO_NESTED_GENERIC_RECORD))
+          .set(
+              "map",
+              ImmutableMap.of(
+                  new Utf8("k1"), AVRO_NESTED_GENERIC_RECORD,
+                  new Utf8("k2"), AVRO_NESTED_GENERIC_RECORD))
+          .build();
+
+  private static final Row NESTED_ROW = Row.withSchema(SUBSCHEMA).addValues(true, 42).build();
+  private static final Row ROW =
+      Row.withSchema(SCHEMA)
+          .addValues(
+              true,
+              43,
+              44L,
+              (float) 44.1,
+              (double) 44.2,
+              "mystring",
+              ByteBuffer.wrap(BYTE_ARRAY),
+              BYTE_ARRAY,
+              DATE.toDateTimeAtStartOfDay(DateTimeZone.UTC),
+              DATE_TIME,
+              TEST_ENUM_TYPE.valueOf("abc"),
+              NESTED_ROW,
+              ImmutableList.of(NESTED_ROW, NESTED_ROW),
+              ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW))
+          .build();
+
+  @Test
+  public void testSpecificRecordSchema() {
+    assertEquals(
+        SCHEMA,
+        new org.apache.beam.sdk.schemas.AvroRecordSchema()

Review Comment:
   Good question... I don't remember why it's disappeared, I'll fix it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1361455975

   @mosche 
   > This PR doesn't close the ticket yet, right?
   
   It does since it's just a sub-task of #24292 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1055164683


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/DynamicAvroDestinations.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link org.apache.beam.sdk.io.AvroIO}. In
+ * addition to dynamic file destinations, this allows specifying other AVRO properties (schema,
+ * metadata, codec, datum writer) per destination.
+ */
+public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT>

Review Comment:
   my bad, misspelled it 🤦 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1365209279

   Run Java_Pulsar_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054578980


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -0,0 +1,2026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * {@link PTransform}s for reading and writing Avro files.
+ *
+ * <h2>Reading Avro files</h2>
+ *
+ * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
+ * pipeline construction time, use {@link #read}, using {@link Read#from} to specify the filename or
+ * filepattern to read from. If the filepatterns to be read are themselves in a {@link PCollection}
+ * you can use {@link FileIO} to match them and {@link AvroIO#readFiles} to read them. If the schema
+ * is unknown at pipeline construction time, use {@link #parseGenericRecords} or {@link
+ * #parseFilesGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these transforms.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
+ *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the
+ * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
+ * AvroIO#readFiles(Class)} allow streaming of new files matching the filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link
+ * AvroIO#readFiles(Class)} allows them in case the filepattern contains a glob wildcard character.
+ * Use {@link Read#withEmptyMatchTreatment} or {@link
+ * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link AvroIO#readFiles(Class)}
+ * to configure this behavior.
+ *
+ * <h3>Reading records of a known schema</h3>
+ *
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
+ * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
+ * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link FileIO} matching
+ * plus {@link #readFilesGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // Read Avro-generated classes from files on GCS
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
+ *
+ * // Read GenericRecord's of the given schema from files on GCS
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records =
+ *     p.apply(AvroIO.readGenericRecords(schema)
+ *                .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO}
+ * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<Foo> records =
+ *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
+ * }</pre>
+ *
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.readFiles(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * PCollection<Foo> records =
+ *     filepatterns
+ *         .apply(FileIO.matchAll())
+ *         .apply(FileIO.readMatches())
+ *         .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
+ * }</pre>
+ *
+ * <h3>Streaming new files matching a filepattern</h3>
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ *     .read(AvroAutoGenClass.class)
+ *     .from("gs://my_bucket/path/to/records-*.avro")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h3>Inferring Beam schemas from Avro files</h3>
+ *
+ * <p>If you want to use SQL or schema based operations on an Avro-based PCollection, you must
+ * configure the read transform to infer the Beam schema and automatically setup the Beam related
+ * coders by doing:
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records =
+ *     p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
+ * }</pre>
+ *
+ * <h3>Inferring Beam schemas from Avro PCollections</h3>
+ *
+ * <p>If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as
+ * the output of another PTransform, you may be interested on making your PCollection schema-aware
+ * so you can use the Schema-based APIs or Beam's SqlTransform.
+ *
+ * <p>If you are using Avro specific records (generated classes from an Avro schema), you can
+ * register a schema provider for the specific Avro class to make any PCollection of these objects
+ * schema-aware.
+ *
+ * <pre>{@code
+ * pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
+ * }</pre>
+ *
+ * You can also manually set an Avro-backed Schema coder for a PCollection using {@link
+ * AvroUtils#schemaCoder(Class, Schema)} to make it schema-aware.
+ *
+ * <pre>{@code
+ * PCollection<AvroAutoGenClass> records = ...
+ * AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder();
+ * records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
+ * }</pre>
+ *
+ * <p>If you are using GenericRecords you may need to set a specific Beam schema coder for each
+ * PCollection to match their internal Avro schema.
+ *
+ * <pre>{@code
+ * org.apache.avro.Schema avroSchema = ...
+ * PCollection<GenericRecord> records = ...
+ * records.setCoder(AvroUtils.schemaCoder(avroSchema));
+ * }</pre>
+ *
+ * <h2>Writing Avro files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link Write}, using {@code
+ * AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
+ * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
+ * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link
+ * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this
+ * default write filename policy using {@link Write#to(FilenamePolicy)} to specify a custom file
+ * naming policy.
+ *
+ * <p>By default, {@link Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be changed or overridden
+ * using {@link Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
+ *
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
+ * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * // A simple Write to a local file (only runs locally):
+ * PCollection<AvroAutoGenClass> records = ...;
+ * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
+ *
+ * // A Write to a sharded GCS file (runs locally and using remote execution):
+ * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
+ * PCollection<GenericRecord> records = ...;
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
+ *     .to("gs://my_bucket/path/to/numbers")
+ *     .withSuffix(".avro"));
+ * }</pre>
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link Write#withWindowedWrites()} will
+ * cause windowing and triggering to be preserved. When producing windowed writes with a streaming
+ * runner that supports triggers, the number of output shards must be set explicitly using {@link
+ * Write#withNumShards(int)}; some runners may set this for you to a runner-chosen value, so you may
+ * need not set it yourself. A {@link FilenamePolicy} must be set, and unique windows and triggers
+ * must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ *     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ *   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ *   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ *     this.userToSchemaMap = userToSchemaMap;
+ *   }
+ *   public GenericRecord formatRecord(UserEvent record) {
+ *     return formatUserRecord(record, getSchema(record.getUserId()));
+ *   }
+ *   public Schema getSchema(Integer userId) {
+ *     return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ *   }
+ *   public Integer getDestination(UserEvent record) {
+ *     return record.getUserId();
+ *   }
+ *   public Integer getDefaultDestination() {
+ *     return 0;
+ *   }
+ *   public FilenamePolicy getFilenamePolicy(Integer userId) {
+ *     return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ *     + userId + "/events"));
+ *   }
+ *   public List<PCollectionView<?>> getSideInputs() {
+ *     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ *   }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
+ *     "ComputePerUserSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ *     .to(new UserDynamicAvroDestinations(userToSchemaMap)));
+ * }</pre>
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class AvroIO {
+  /**
+   * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+   *
+   * <p>The schema must be specified using one of the {@code withSchema} functions.
+   */
+  public static <T> Read<T> read(Class<T> recordClass) {
+    return new AutoValue_AvroIO_Read.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each file in a {@link PCollection} of {@link ReadableFile},
+   * returned by {@link FileIO#readMatches}.
+   *
+   * <p>You can read {@link GenericRecord} by using {@code #readFiles(GenericRecord.class)} or
+   * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a String.
+   */
+  public static <T> ReadFiles<T> readFiles(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each filepattern in the input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAll} using {@link FileIO} matching
+   *     plus {@link #readFiles(Class)}. This is the preferred method to make composition explicit.
+   *     {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /** Reads Avro file(s) containing records of the specified schema. */
+  public static Read<GenericRecord> readGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
+   * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(Schema)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(Schema)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .setInferBeamSchema(false)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
+   * JSON-encoded string.
+   */
+  public static Read<GenericRecord> readGenericRecords(String schema) {
+    return readGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /** Like {@link #readGenericRecords(String)}, but for {@link ReadableFile} collections. */
+  public static ReadFiles<GenericRecord> readFilesGenericRecords(String schema) {
+    return readFilesGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(String)} using
+   *     {@link FileIO} matching plus {@link #readFilesGenericRecords(String)}. This is the
+   *     preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
+   *     and will be removed in a future version of Beam.
+   */
+  @Deprecated
+  public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+    return readAllGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
+   * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
+   * custom type.
+   */
+  public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .setParseFn(parseFn)
+        .setHintMatchesManyFiles(false)
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each {@link ReadableFile} in
+   * the input {@link PCollection}.
+   */
+  public static <T> ParseFiles<T> parseFilesGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseFiles.Builder<T>()
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
+        .build();
+  }
+
+  /**
+   * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
+   * input {@link PCollection}.
+   *
+   * @deprecated You can achieve The functionality of {@link
+   *     #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} matching plus {@link
+   *     #parseFilesGenericRecords(SerializableFunction)} ()}. This is the preferred method to make
+   *     composition explicit. {@link ParseAll} will not receive upgrades and will be removed in a
+   *     future version of Beam.
+   */
+  @Deprecated
+  public static <T> ParseAll<T> parseAllGenericRecords(
+      SerializableFunction<GenericRecord, T> parseFn) {
+    return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .setParseFn(parseFn)
+        .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .build();
+  }
+
+  /**
+   * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
+   * pattern).
+   */
+  public static <T> Write<T> write(Class<T> recordClass) {
+    return new Write<>(
+        AvroIO.<T, T>defaultWriteBuilder()
+            .setGenericRecords(false)
+            .setSchema(ReflectData.get().getSchema(recordClass))
+            .build());
+  }
+
+  /** Writes Avro records of the specified schema. */
+  public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+    return new Write<>(
+        AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+            .setGenericRecords(true)
+            .setSchema(schema)
+            .build());
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * record of type OutputT.
+   *
+   * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+   * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+   * that will be written to the file must be specified. If using a custom {@link
+   * DynamicAvroDestinations} object this is done using {@link
+   * DynamicAvroDestinations#formatRecord}, otherwise the {@link TypedWrite#withFormatFunction} can
+   * be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+   * DynamicAvroDestinations} object, set via {@link Write#to(DynamicAvroDestinations)} to examine
+   * the custom type when choosing a destination.
+   *
+   * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+   * instead.
+   */
+  public static <UserT, OutputT> TypedWrite<UserT, Void, OutputT> writeCustomType() {
+    return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  }
+
+  /**
+   * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+   * {@link GenericRecord}. A schema must be specified either in {@link
+   * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+   * TypedWrite#withSchema(Schema)}.
+   */
+  public static <UserT> TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords() {
+    return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
+  }
+
+  /**
+   * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_TypedWrite.Builder<UserT, Void, OutputT>()
+        .setFilenameSuffix(null)
+        .setShardTemplate(null)
+        .setNumShards(0)
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .setMetadata(ImmutableMap.of())
+        .setWindowedWrites(false)
+        .setNoSpilling(false);
+  }
+
+  @Experimental(Kind.SCHEMAS)
+  private static <T> PCollection<T> setBeamSchema(
+      PCollection<T> pc, Class<T> clazz, @Nullable Schema schema) {
+    return pc.setCoder(AvroUtils.schemaCoder(clazz, schema));
+  }
+
+  /**
+   * 64MB is a reasonable value that allows to amortize the cost of opening files, but is not so
+   * large as to exhaust a typical runner's maximum amount of output per ProcessElement call.
+   */
+  private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
+
+  /** Implementation of {@link #read} and {@link #readGenericRecords}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    abstract @Nullable ValueProvider<String> getFilepattern();
+
+    abstract MatchConfiguration getMatchConfiguration();
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract boolean getHintMatchesManyFiles();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);
+
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Reads from the given filename or filepattern.
+     *
+     * <p>If it is known that the filepattern will match a very large number of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
+     */
+    public Read<T> from(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read<T> from(String filepattern) {
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) {
+      return toBuilder().setMatchConfiguration(matchConfiguration).build();
+    }
+
+    /** Configures whether or not a filepattern matching no files is allowed. */
+    public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * Continuously watches for new files matching the filepattern, polling it at the given
+     * interval, until the given termination condition is reached. The returned {@link PCollection}
+     * is unbounded. If {@code matchUpdatedFiles} is set, also watches for files with timestamp
+     * change.
+     *
+     * <p>This works only in runners supporting splittable {@link
+     * org.apache.beam.sdk.transforms.DoFn}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval,
+        TerminationCondition<String, ?> terminationCondition,
+        boolean matchUpdatedFiles) {
+      return withMatchConfiguration(
+          getMatchConfiguration()
+              .continuously(pollInterval, terminationCondition, matchUpdatedFiles));
+    }
+
+    /**
+     * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code
+     * matchUpdatedFiles=false}.
+     */
+    public Read<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return watchForNewFiles(pollInterval, terminationCondition, false);
+    }
+
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, in a way that improves
+     * performance for this case, but it may worsen performance if the filepattern matches only a
+     * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read<T> withHintMatchesManyFiles() {
+      return toBuilder().setHintMatchesManyFiles(true).build();
+    }
+
+    /**
+     * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
+     * to be used by SQL and by the schema-transform library.
+     */
+    @Experimental(Kind.SCHEMAS)
+    public Read<T> withBeamSchemas(boolean withBeamSchemas) {
+      return toBuilder().setInferBeamSchema(withBeamSchemas).build();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<T> expand(PBegin input) {
+      checkNotNull(getFilepattern(), "filepattern");
+      checkNotNull(getSchema(), "schema");
+
+      if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
+        PCollection<T> read =
+            input.apply(
+                "Read",
+                org.apache.beam.sdk.io.Read.from(
+                    createSource(
+                        getFilepattern(),
+                        getMatchConfiguration().getEmptyMatchTreatment(),
+                        getRecordClass(),
+                        getSchema(),
+                        null)));
+        return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
+      }
+
+      // All other cases go through FileIO + ReadFiles
+      ReadFiles<T> readFiles =
+          (getRecordClass() == GenericRecord.class)
+              ? (ReadFiles<T>) readFilesGenericRecords(getSchema())
+              : readFiles(getRecordClass());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(
+              "Read Matches",
+              FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply("Via ReadFiles", readFiles);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("inferBeamSchema", getInferBeamSchema())
+                  .withLabel("Infer Beam Schema"))
+          .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
+          .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> org.apache.beam.sdk.io.AvroSource<T> createSource(
+        ValueProvider<String> filepattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        Class<T> recordClass,
+        Schema schema,
+        org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T> readerFactory) {
+      org.apache.beam.sdk.io.AvroSource<?> source =
+          org.apache.beam.sdk.io.AvroSource.from(filepattern)
+              .withEmptyMatchTreatment(emptyMatchTreatment);
+
+      if (readerFactory != null) {
+        source = source.withDatumReaderFactory(readerFactory);
+      }
+      return recordClass == GenericRecord.class
+          ? (org.apache.beam.sdk.io.AvroSource<T>) source.withSchema(schema)
+          : source.withSchema(recordClass);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #readFiles}. */
+  @AutoValue
+  public abstract static class ReadFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+
+    abstract @Nullable Class<T> getRecordClass();
+
+    abstract @Nullable Schema getSchema();
+
+    abstract boolean getUsesReshuffle();
+
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract boolean getInferBeamSchema();
+
+    abstract org.apache.beam.sdk.io.AvroSource.@Nullable DatumReaderFactory<T>
+        getDatumReaderFactory();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract Builder<T> setInferBeamSchema(boolean infer);
+
+      abstract Builder<T> setDatumReaderFactory(
+          org.apache.beam.sdk.io.AvroSource.DatumReaderFactory<T> factory);

Review Comment:
   Good point, I think it was automatically added by my IDE. I'll check and fix such cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1361311656

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054526987


##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(

Review Comment:
   Done. I also added `@Experimental(Kind.EXTENSION)` in extension package-info.java files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1366486535

   👀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1054587463


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/DynamicAvroDestinations.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link org.apache.beam.sdk.io.AvroIO}. In
+ * addition to dynamic file destinations, this allows specifying other AVRO properties (schema,
+ * metadata, codec, datum writer) per destination.
+ */
+public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT>

Review Comment:
   Is it?
   https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #24294:
URL: https://github.com/apache/beam/pull/24294#discussion_r1055165037


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/DynamicAvroDestinations.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link org.apache.beam.sdk.io.AvroIO}. In

Review Comment:
   wrong type referenced here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1361304275

   👀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24294:
URL: https://github.com/apache/beam/pull/24294#issuecomment-1359746781

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @Abacn for label build.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev merged pull request #24294: Create Avro extension for Java SDK

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged PR #24294:
URL: https://github.com/apache/beam/pull/24294


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org