You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/02/03 17:43:56 UTC

[beam] branch master updated: 24472 Implement FileWriteSchemaTransformProvider (#24806)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b3be8b65aff 24472 Implement FileWriteSchemaTransformProvider (#24806)
b3be8b65aff is described below

commit b3be8b65aff0649e52c4d1c7e1d19e7da34da342
Author: Damon <da...@users.noreply.github.com>
AuthorDate: Fri Feb 3 09:43:49 2023 -0800

    24472 Implement FileWriteSchemaTransformProvider (#24806)
    
    * Implement FileWriteSchemaTransformProvider
    
    * Final cleanup
    
    * Remove unused code
    
    * Refactor file structure; add more unit tests
    
    * XmlRowAdapter Tests
    
    * Remove unused class
    
    * Remove unused test code
    
    * Further cleanup
    
    * Convert PDone to PCollection<String>
    
    * WIP
    
    * Refactor tests
    
    * wip
    
    * Final cleanup
    
    * Revert settings.gradle.kts
    
    * Patch SchemaAwareJavaBeansTest
    
    * Remove 2nd call to readpipeline run
    
    * Set numshards=1 for xml content tests
    
    * Patch code comments
    
    * Update OWNERS
    
    * Comply with Python SDK external transforms
    
    * Refactor Python SDK compliant String check
---
 .../beam/sdk/io/common/SchemaAwareJavaBeans.java   | 494 +++++++++++++++++++++
 .../sdk/io/common/SchemaAwareJavaBeansTest.java    | 175 ++++++++
 .../OWNERS                                         |   3 +
 .../build.gradle                                   |  20 +-
 .../AvroWriteSchemaTransformFormatProvider.java    |  88 ++++
 .../CsvWriteSchemaTransformFormatProvider.java}    |  31 +-
 .../FileWriteSchemaTransformConfiguration.java     |  66 +--
 .../FileWriteSchemaTransformFormatProvider.java    |  16 +-
 .../FileWriteSchemaTransformFormatProviders.java   | 148 ++++++
 .../FileWriteSchemaTransformProvider.java          | 191 ++++++++
 .../JsonWriteSchemaTransformFormatProvider.java    |  93 ++++
 .../ParquetWriteSchemaTransformFormatProvider.java | 117 +++++
 .../fileschematransform/XmlDateTimeAdapter.java}   |  36 +-
 .../sdk/io/fileschematransform/XmlRowAdapter.java  |  93 ++++
 .../sdk/io/fileschematransform/XmlRowValue.java    | 268 +++++++++++
 .../XmlWriteSchemaTransformFormatProvider.java     | 115 +++++
 .../sdk/io/fileschematransform/package-info.java   |   0
 ...FileWriteSchemaTransformFormatProviderTest.java | 104 +++++
 ...FileWriteSchemaTransformFormatProviderTest.java | 434 ++++++++++++++++++
 ...WriteSchemaTransformFormatProviderTestData.java | 231 ++++++++++
 ...ileWriteSchemaTransformFormatProvidersTest.java |  17 +-
 .../FileWriteSchemaTransformProviderTest.java      | 127 ++++++
 ...FileWriteSchemaTransformFormatProviderTest.java | 104 +++++
 ...FileWriteSchemaTransformFormatProviderTest.java | 112 +++++
 .../XmlDateTimeAdapterTest.java}                   |  32 +-
 ...FileWriteSchemaTransformFormatProviderTest.java | 120 +++++
 .../io/fileschematransform/XmlRowAdapterTest.java  | 190 ++++++++
 .../io/fileschematransform/XmlRowValueTest.java    | 391 ++++++++++++++++
 .../FileWriteSchemaTransformFormatProviders.java   | 124 ------
 settings.gradle.kts                                |   2 +-
 30 files changed, 3707 insertions(+), 235 deletions(-)

diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java
new file mode 100644
index 00000000000..702bc29c90d
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Instant;
+
+/** Various Java Beans and associated schemas used in tests. */
+public class SchemaAwareJavaBeans {
+
+  private static final DefaultSchemaProvider DEFAULT_SCHEMA_PROVIDER = new DefaultSchemaProvider();
+
+  /** Convenience method for {@link AllPrimitiveDataTypes} instantiation. */
+  public static AllPrimitiveDataTypes allPrimitiveDataTypes(
+      Boolean aBoolean,
+      Byte aByte,
+      BigDecimal aDecimal,
+      Double aDouble,
+      Float aFloat,
+      Short aShort,
+      Integer anInteger,
+      Long aLong,
+      String aString) {
+    return new AutoValue_SchemaAwareJavaBeans_AllPrimitiveDataTypes.Builder()
+        .setABoolean(aBoolean)
+        .setAByte(aByte)
+        .setADecimal(aDecimal)
+        .setADouble(aDouble)
+        .setAFloat(aFloat)
+        .setAShort(aShort)
+        .setAnInteger(anInteger)
+        .setALong(aLong)
+        .setAString(aString)
+        .build();
+  }
+
+  /** Convenience method for {@link NullableAllPrimitiveDataTypes} instantiation. */
+  public static NullableAllPrimitiveDataTypes nullableAllPrimitiveDataTypes(
+      @Nullable Boolean aBoolean,
+      @Nullable Double aDouble,
+      @Nullable Float aFloat,
+      @Nullable Integer anInteger,
+      @Nullable Long aLong,
+      @Nullable String aString) {
+    return new AutoValue_SchemaAwareJavaBeans_NullableAllPrimitiveDataTypes.Builder()
+        .setABoolean(aBoolean)
+        .setADouble(aDouble)
+        .setAFloat(aFloat)
+        .setAnInteger(anInteger)
+        .setALong(aLong)
+        .setAString(aString)
+        .build();
+  }
+
+  /** Convenience method for {@link TimeContaining} instantiation. */
+  public static TimeContaining timeContaining(Instant instant, List<Instant> instantList) {
+    return new AutoValue_SchemaAwareJavaBeans_TimeContaining.Builder()
+        .setInstant(instant)
+        .setInstantList(instantList)
+        .build();
+  }
+
+  /** Convenience method for {@link ArrayPrimitiveDataTypes} instantiation. */
+  public static ArrayPrimitiveDataTypes arrayPrimitiveDataTypes(
+      List<Boolean> booleans,
+      List<Double> doubles,
+      List<Float> floats,
+      List<Short> shorts,
+      List<Integer> integers,
+      List<Long> longs,
+      List<String> strings) {
+    return new AutoValue_SchemaAwareJavaBeans_ArrayPrimitiveDataTypes.Builder()
+        .setBooleanList(booleans)
+        .setDoubleList(doubles)
+        .setFloatList(floats)
+        .setShortList(shorts)
+        .setIntegerList(integers)
+        .setLongList(longs)
+        .setStringList(strings)
+        .build();
+  }
+
+  /** Convenience method for {@link SinglyNestedDataTypes} instantiation. */
+  public static SinglyNestedDataTypes singlyNestedDataTypes(
+      AllPrimitiveDataTypes allPrimitiveDataTypes, AllPrimitiveDataTypes... repeated) {
+    return new AutoValue_SchemaAwareJavaBeans_SinglyNestedDataTypes.Builder()
+        .setAllPrimitiveDataTypes(allPrimitiveDataTypes)
+        .setAllPrimitiveDataTypesList(Arrays.stream(repeated).collect(Collectors.toList()))
+        .build();
+  }
+
+  /** Convenience method for {@link DoublyNestedDataTypes} instantiation. */
+  public static DoublyNestedDataTypes doublyNestedDataTypes(
+      SinglyNestedDataTypes singlyNestedDataTypes, SinglyNestedDataTypes... repeated) {
+    return new AutoValue_SchemaAwareJavaBeans_DoublyNestedDataTypes.Builder()
+        .setSinglyNestedDataTypes(singlyNestedDataTypes)
+        .setSinglyNestedDataTypesList(Arrays.stream(repeated).collect(Collectors.toList()))
+        .build();
+  }
+
+  private static final TypeDescriptor<AllPrimitiveDataTypes>
+      ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(AllPrimitiveDataTypes.class);
+
+  /** The schema for {@link AllPrimitiveDataTypes}. */
+  public static final Schema ALL_PRIMITIVE_DATA_TYPES_SCHEMA =
+      DEFAULT_SCHEMA_PROVIDER.schemaFor(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link AllPrimitiveDataTypes} to a
+   * {@link Row}.
+   */
+  public static SerializableFunction<AllPrimitiveDataTypes, Row> allPrimitiveDataTypesToRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.toRowFunction(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+   * AllPrimitiveDataTypes}.
+   */
+  public static SerializableFunction<Row, AllPrimitiveDataTypes> allPrimitiveDataTypesFromRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  private static final TypeDescriptor<NullableAllPrimitiveDataTypes>
+      NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR =
+          TypeDescriptor.of(NullableAllPrimitiveDataTypes.class);
+
+  /** The schema for {@link NullableAllPrimitiveDataTypes}. */
+  public static final Schema NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA =
+      DEFAULT_SCHEMA_PROVIDER.schemaFor(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link NullableAllPrimitiveDataTypes}
+   * to a {@link Row}.
+   */
+  public static SerializableFunction<NullableAllPrimitiveDataTypes, Row>
+      nullableAllPrimitiveDataTypesToRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.toRowFunction(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+   * NullableAllPrimitiveDataTypes}.
+   */
+  public static SerializableFunction<Row, NullableAllPrimitiveDataTypes>
+      nullableAllPrimitiveDataTypesFromRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(
+        NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  private static final TypeDescriptor<TimeContaining> TIME_CONTAINING_TYPE_DESCRIPTOR =
+      TypeDescriptor.of(TimeContaining.class);
+
+  /** The schema for {@link TimeContaining}. */
+  public static final Schema TIME_CONTAINING_SCHEMA =
+      DEFAULT_SCHEMA_PROVIDER.schemaFor(TIME_CONTAINING_TYPE_DESCRIPTOR);
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link TimeContaining} to a {@link
+   * Row}.
+   */
+  public static SerializableFunction<TimeContaining, Row> timeContainingToRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.toRowFunction(TIME_CONTAINING_TYPE_DESCRIPTOR);
+  }
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+   * TimeContaining}.
+   */
+  public static SerializableFunction<Row, TimeContaining> timeContainingFromRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(TIME_CONTAINING_TYPE_DESCRIPTOR);
+  }
+
+  private static final TypeDescriptor<ArrayPrimitiveDataTypes>
+      ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(ArrayPrimitiveDataTypes.class);
+
+  /** The schema for {@link ArrayPrimitiveDataTypes}. */
+  public static final Schema ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA =
+      DEFAULT_SCHEMA_PROVIDER.schemaFor(ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link ArrayPrimitiveDataTypes} to a
+   * {@link Row}.
+   */
+  public static SerializableFunction<ArrayPrimitiveDataTypes, Row>
+      arrayPrimitiveDataTypesToRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.toRowFunction(ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+   * ArrayPrimitiveDataTypes}.
+   */
+  public static SerializableFunction<Row, ArrayPrimitiveDataTypes>
+      arrayPrimitiveDataTypesFromRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  private static final TypeDescriptor<SinglyNestedDataTypes>
+      SINGLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(SinglyNestedDataTypes.class);
+
+  /** The schema for {@link SinglyNestedDataTypes}. */
+  public static final Schema SINGLY_NESTED_DATA_TYPES_SCHEMA =
+      DEFAULT_SCHEMA_PROVIDER.schemaFor(SINGLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link SinglyNestedDataTypes} to a
+   * {@link Row}.
+   */
+  public static SerializableFunction<SinglyNestedDataTypes, Row> singlyNestedDataTypesToRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.toRowFunction(SINGLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+   * SinglyNestedDataTypes}.
+   */
+  public static SerializableFunction<Row, SinglyNestedDataTypes> singlyNestedDataTypesFromRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(SINGLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  private static final TypeDescriptor<DoublyNestedDataTypes>
+      DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(DoublyNestedDataTypes.class);
+
+  /** The schema for {@link DoublyNestedDataTypes}. */
+  public static final Schema DOUBLY_NESTED_DATA_TYPES_SCHEMA =
+      DEFAULT_SCHEMA_PROVIDER.schemaFor(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link DoublyNestedDataTypes} to a
+   * {@link Row}.
+   */
+  public static SerializableFunction<DoublyNestedDataTypes, Row> doublyNestedDataTypesToRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.toRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  /**
+   * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+   * DoublyNestedDataTypes}.
+   */
+  public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataTypesFromRowFn() {
+    return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+  }
+
+  /**
+   * Contains all primitive Java types i.e. String, Integer, etc and {@link BigDecimal}. The purpose
+   * of this class is to test schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AllPrimitiveDataTypes implements Serializable {
+
+    public abstract Boolean getABoolean();
+
+    public abstract Byte getAByte();
+
+    public abstract BigDecimal getADecimal();
+
+    public abstract Double getADouble();
+
+    public abstract Float getAFloat();
+
+    public abstract Short getAShort();
+
+    public abstract Integer getAnInteger();
+
+    public abstract Long getALong();
+
+    public abstract String getAString();
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      public abstract Builder setABoolean(Boolean value);
+
+      public abstract Builder setAByte(Byte value);
+
+      public abstract Builder setADecimal(BigDecimal value);
+
+      public abstract Builder setADouble(Double value);
+
+      public abstract Builder setAFloat(Float value);
+
+      public abstract Builder setAShort(Short value);
+
+      public abstract Builder setAnInteger(Integer value);
+
+      public abstract Builder setALong(Long value);
+
+      public abstract Builder setAString(String value);
+
+      public abstract AllPrimitiveDataTypes build();
+    }
+  }
+
+  /**
+   * Contains all nullable primitive Java types i.e. String, Integer, etc and {@link BigDecimal}.
+   * The purpose of this class is to test schema-aware PTransforms with flat {@link Schema} {@link
+   * Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class NullableAllPrimitiveDataTypes implements Serializable {
+
+    @Nullable
+    public abstract Boolean getABoolean();
+
+    @Nullable
+    public abstract Double getADouble();
+
+    @Nullable
+    public abstract Float getAFloat();
+
+    @Nullable
+    public abstract Integer getAnInteger();
+
+    @Nullable
+    public abstract Long getALong();
+
+    @Nullable
+    public abstract String getAString();
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      public abstract Builder setABoolean(Boolean value);
+
+      public abstract Builder setADouble(Double value);
+
+      public abstract Builder setAFloat(Float value);
+
+      public abstract Builder setAnInteger(Integer value);
+
+      public abstract Builder setALong(Long value);
+
+      public abstract Builder setAString(String value);
+
+      public abstract NullableAllPrimitiveDataTypes build();
+    }
+  }
+
+  /**
+   * Contains time-related types. The purpose of this class is to test schema-aware PTransforms with
+   * time-related {@link Schema.FieldType} containing {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class TimeContaining {
+
+    public abstract Instant getInstant();
+
+    public abstract List<Instant> getInstantList();
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      public abstract Builder setInstant(Instant value);
+
+      public abstract Builder setInstantList(List<Instant> value);
+
+      public abstract TimeContaining build();
+    }
+  }
+
+  /**
+   * Contains arrays of all primitive Java types i.e. String, Integer, etc and {@link BigDecimal}.
+   * The purpose of this class is to test schema-aware PTransforms with {@link Row}s containing
+   * repeated primitive Java types.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class ArrayPrimitiveDataTypes {
+
+    public abstract List<Boolean> getBooleanList();
+
+    public abstract List<Double> getDoubleList();
+
+    public abstract List<Float> getFloatList();
+
+    public abstract List<Short> getShortList();
+
+    public abstract List<Integer> getIntegerList();
+
+    public abstract List<Long> getLongList();
+
+    public abstract List<String> getStringList();
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      public abstract Builder setBooleanList(List<Boolean> value);
+
+      public abstract Builder setDoubleList(List<Double> value);
+
+      public abstract Builder setFloatList(List<Float> value);
+
+      public abstract Builder setShortList(List<Short> value);
+
+      public abstract Builder setIntegerList(List<Integer> value);
+
+      public abstract Builder setLongList(List<Long> value);
+
+      public abstract Builder setStringList(List<String> value);
+
+      public abstract ArrayPrimitiveDataTypes build();
+    }
+  }
+
+  /**
+   * Contains a singly nested and repeated {@link AllPrimitiveDataTypes}. The purpose of this class
+   * is to test schema-aware PTransforms with {@link Row}s containing nested and repeated complex
+   * Java types.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class SinglyNestedDataTypes {
+
+    public abstract AllPrimitiveDataTypes getAllPrimitiveDataTypes();
+
+    public abstract List<AllPrimitiveDataTypes> getAllPrimitiveDataTypesList();
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      public abstract Builder setAllPrimitiveDataTypes(AllPrimitiveDataTypes value);
+
+      public abstract Builder setAllPrimitiveDataTypesList(List<AllPrimitiveDataTypes> value);
+
+      public abstract SinglyNestedDataTypes build();
+    }
+  }
+
+  /**
+   * Contains a nested and repeated {@link SinglyNestedDataTypes}. The purpose of this class is to
+   * test schema-aware PTransforms with {@link Row}s containing deeper nested and repeated complex
+   * Java types.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class DoublyNestedDataTypes {
+
+    public abstract SinglyNestedDataTypes getSinglyNestedDataTypes();
+
+    public abstract List<SinglyNestedDataTypes> getSinglyNestedDataTypesList();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      public abstract Builder setSinglyNestedDataTypes(SinglyNestedDataTypes value);
+
+      public abstract Builder setSinglyNestedDataTypesList(List<SinglyNestedDataTypes> value);
+
+      public abstract DoublyNestedDataTypes build();
+    }
+  }
+}
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeansTest.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeansTest.java
new file mode 100644
index 00000000000..76de0b75501
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeansTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn;
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.AllPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ArrayPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.DoublyNestedDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NullableAllPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SinglyNestedDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SchemaAwareJavaBeans}. */
+@RunWith(JUnit4.class)
+public class SchemaAwareJavaBeansTest {
+  @Test
+  public void allPrimitiveDataTypesRowFns() {
+    AllPrimitiveDataTypes element =
+        allPrimitiveDataTypes(
+            false, (byte) 1, BigDecimal.valueOf(1L), 1.2345, 1.2345f, (short) 1, 1, 1L, "a");
+
+    Row row = allPrimitiveDataTypesToRowFn().apply(element);
+    assertEquals(element, allPrimitiveDataTypesFromRowFn().apply(row));
+  }
+
+  @Test
+  public void nullableAllPrimitiveDataTypesRowFns() {
+    NullableAllPrimitiveDataTypes allNull =
+        nullableAllPrimitiveDataTypes(null, null, null, null, null, null);
+    Row allNullRow = nullableAllPrimitiveDataTypesToRowFn().apply(allNull);
+    assertEquals(allNull, nullableAllPrimitiveDataTypesFromRowFn().apply(allNullRow));
+
+    NullableAllPrimitiveDataTypes nonNull =
+        nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, 1, 1L, "a");
+    Row nonNullRow = nullableAllPrimitiveDataTypesToRowFn().apply(nonNull);
+    assertEquals(nonNull, nullableAllPrimitiveDataTypesFromRowFn().apply(nonNullRow));
+  }
+
+  @Test
+  public void timeContainingRowFns() {
+    TimeContaining element =
+        timeContaining(
+            Instant.ofEpochMilli(1L),
+            Arrays.asList(
+                Instant.ofEpochMilli(2L), Instant.ofEpochMilli(3L), Instant.ofEpochMilli(4L)));
+    Row row = timeContainingToRowFn().apply(element);
+    assertEquals(element, timeContainingFromRowFn().apply(row));
+  }
+
+  @Test
+  public void arrayPrimitiveDataTypesRowFns() {
+    ArrayPrimitiveDataTypes allEmpty =
+        arrayPrimitiveDataTypes(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList());
+    Row allEmptyRow = arrayPrimitiveDataTypesToRowFn().apply(allEmpty);
+    assertEquals(allEmpty, arrayPrimitiveDataTypesFromRowFn().apply(allEmptyRow));
+
+    ArrayPrimitiveDataTypes noneEmpty =
+        arrayPrimitiveDataTypes(
+            Stream.generate(() -> true).limit(Short.MAX_VALUE).collect(Collectors.toList()),
+            Stream.generate(() -> Double.MIN_VALUE)
+                .limit(Short.MAX_VALUE)
+                .collect(Collectors.toList()),
+            Stream.generate(() -> Float.MIN_VALUE)
+                .limit(Short.MAX_VALUE)
+                .collect(Collectors.toList()),
+            Stream.generate(() -> Short.MIN_VALUE)
+                .limit(Short.MAX_VALUE)
+                .collect(Collectors.toList()),
+            Stream.generate(() -> Integer.MIN_VALUE)
+                .limit(Short.MAX_VALUE)
+                .collect(Collectors.toList()),
+            Stream.generate(() -> Long.MIN_VALUE)
+                .limit(Short.MAX_VALUE)
+                .collect(Collectors.toList()),
+            Stream.generate(() -> "🐿").limit(Short.MAX_VALUE).collect(Collectors.toList()));
+    Row noneEmptyRow = arrayPrimitiveDataTypesToRowFn().apply(noneEmpty);
+    assertEquals(noneEmpty, arrayPrimitiveDataTypesFromRowFn().apply(noneEmptyRow));
+  }
+
+  @Test
+  public void singlyNestedDataTypesRowFns() {
+    AllPrimitiveDataTypes element =
+        allPrimitiveDataTypes(
+            false, (byte) 1, BigDecimal.valueOf(1L), 1.2345, 1.2345f, (short) 1, 1, 1L, "a");
+    SinglyNestedDataTypes notRepeated = singlyNestedDataTypes(element);
+    SinglyNestedDataTypes repeated = singlyNestedDataTypes(element, element, element, element);
+    Row notRepeatedRow = singlyNestedDataTypesToRowFn().apply(notRepeated);
+    Row repeatedRow = singlyNestedDataTypesToRowFn().apply(repeated);
+    assertEquals(notRepeated, singlyNestedDataTypesFromRowFn().apply(notRepeatedRow));
+    assertEquals(repeated, singlyNestedDataTypesFromRowFn().apply(repeatedRow));
+  }
+
+  @Test
+  public void doublyNestedDataTypesRowFns() {
+    AllPrimitiveDataTypes element =
+        allPrimitiveDataTypes(
+            false, (byte) 1, BigDecimal.valueOf(1L), 1.2345, 1.2345f, (short) 1, 1, 1L, "a");
+    DoublyNestedDataTypes d0s0 = doublyNestedDataTypes(singlyNestedDataTypes(element));
+    DoublyNestedDataTypes d1s0 =
+        doublyNestedDataTypes(
+            singlyNestedDataTypes(element),
+            singlyNestedDataTypes(element),
+            singlyNestedDataTypes(element),
+            singlyNestedDataTypes(element));
+    DoublyNestedDataTypes d0s1 =
+        doublyNestedDataTypes(singlyNestedDataTypes(element, element, element, element));
+    DoublyNestedDataTypes d1s1 =
+        doublyNestedDataTypes(
+            singlyNestedDataTypes(element, element, element, element),
+            singlyNestedDataTypes(element, element, element, element),
+            singlyNestedDataTypes(element, element, element, element),
+            singlyNestedDataTypes(element, element, element, element));
+
+    Row d0s0Row = doublyNestedDataTypesToRowFn().apply(d0s0);
+    Row d1s0Row = doublyNestedDataTypesToRowFn().apply(d1s0);
+    Row d0s1Row = doublyNestedDataTypesToRowFn().apply(d0s1);
+    Row d1s1Row = doublyNestedDataTypesToRowFn().apply(d1s1);
+
+    assertEquals(d0s0, doublyNestedDataTypesFromRowFn().apply(d0s0Row));
+    assertEquals(d1s0, doublyNestedDataTypesFromRowFn().apply(d1s0Row));
+    assertEquals(d0s1, doublyNestedDataTypesFromRowFn().apply(d0s1Row));
+    assertEquals(d1s1, doublyNestedDataTypesFromRowFn().apply(d1s1Row));
+  }
+}
diff --git a/sdks/java/io/fileschematransform/OWNERS b/sdks/java/io/file-schema-transform/OWNERS
similarity index 55%
rename from sdks/java/io/fileschematransform/OWNERS
rename to sdks/java/io/file-schema-transform/OWNERS
index c207accc003..aed54489c35 100644
--- a/sdks/java/io/fileschematransform/OWNERS
+++ b/sdks/java/io/file-schema-transform/OWNERS
@@ -1,5 +1,8 @@
 # See the OWNERS docs at https://s.apache.org/beam-owners
+# Reviewers below listed in alphabetic order
 
 reviewers:
+  - ahmedabu98
+  - damondouglas
   - johnjcasey
   - pabloem
diff --git a/sdks/java/io/fileschematransform/build.gradle b/sdks/java/io/file-schema-transform/build.gradle
similarity index 70%
rename from sdks/java/io/fileschematransform/build.gradle
rename to sdks/java/io/file-schema-transform/build.gradle
index f443dca13c0..ac650692f2d 100644
--- a/sdks/java/io/fileschematransform/build.gradle
+++ b/sdks/java/io/file-schema-transform/build.gradle
@@ -32,15 +32,23 @@ configurations.implementation {
     }
 }
 
-// exclude auto-generated classes and integration tests
-def jacocoExcludes = [
-        '**/AutoValue_*',
-        '**/*IT*',
-]
+def parquet_version = "1.12.0"
 
 dependencies {
-    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation library.java.avro
     implementation library.java.commons_csv
+    implementation library.java.jaxb_api
+    implementation library.java.joda_time
+    implementation library.java.vendored_guava_26_0_jre
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation project(path: ":sdks:java:io:parquet")
+    implementation "org.apache.parquet:parquet-common:$parquet_version"
+    implementation project(path: ":sdks:java:io:xml")
+
     testImplementation library.java.junit
     testImplementation project(path: ":sdks:java:core", configuration: "shadow")
+    testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
+    testImplementation project(path: ":sdks:java:io:parquet")
+    testImplementation project(path: ":sdks:java:io:xml")
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
 }
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java
new file mode 100644
index 00000000000..75cf841beab
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.getNumShards;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.getShardNameTemplate;
+
+import com.google.auto.service.AutoService;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/** A {@link FileWriteSchemaTransformFormatProvider} for avro format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class AvroWriteSchemaTransformFormatProvider
+    implements FileWriteSchemaTransformFormatProvider {
+
+  @Override
+  public String identifier() {
+    return FileWriteSchemaTransformFormatProviders.AVRO;
+  }
+
+  /**
+   * Builds a {@link PTransform} that transforms a {@link Row} {@link PCollection} into result
+   * {@link PCollection} file names written using {@link AvroIO.Write}.
+   */
+  @Override
+  public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+      FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+
+    return new PTransform<PCollection<Row>, PCollection<String>>() {
+      @Override
+      public PCollection<String> expand(PCollection<Row> input) {
+
+        org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+        AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
+
+        PCollection<GenericRecord> avro =
+            input
+                .apply(
+                    "Row To Avro Generic Record",
+                    FileWriteSchemaTransformFormatProviders.mapRowsToGenericRecords(schema))
+                .setCoder(coder);
+
+        AvroIO.Write<GenericRecord> write =
+            AvroIO.writeGenericRecords(avroSchema).to(configuration.getFilenamePrefix());
+
+        if (configuration.getNumShards() != null) {
+          int numShards = getNumShards(configuration);
+          // Python SDK external transforms do not support null values requiring additional check.
+          if (numShards > 0) {
+            write = write.withNumShards(numShards);
+          }
+        }
+
+        if (!Strings.isNullOrEmpty(configuration.getShardNameTemplate())) {
+          write = write.withShardNameTemplate(getShardNameTemplate(configuration));
+        }
+
+        return avro.apply("Write Avro", write.withOutputFilenames())
+            .getPerDestinationOutputFilenames()
+            .apply("perDestinationOutputFilenames", Values.create());
+      }
+    };
+  }
+}
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/CsvWriteSchemaTransformFormatProvider.java
similarity index 56%
copy from sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
copy to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/CsvWriteSchemaTransformFormatProvider.java
index a6db5ffabe9..5c6b803cde1 100644
--- a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/CsvWriteSchemaTransformFormatProvider.java
@@ -17,21 +17,26 @@
  */
 package org.apache.beam.sdk.io.fileschematransform;
 
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.io.Providers;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
 
-/**
- * Provides a {@link PTransform} that consumes a {@link PCollection} according to a registered
- * {@link com.google.auto.service.AutoService} {@link FileWriteSchemaTransformFormatProvider}
- * implementation. See {@link FileWriteSchemaTransformFormatProviders} for a list of available
- * formats.
- */
-@Internal
-public interface FileWriteSchemaTransformFormatProvider extends Providers.Identifyable {
+/** A {@link FileWriteSchemaTransformFormatProvider} for CSV format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class CsvWriteSchemaTransformFormatProvider
+    implements FileWriteSchemaTransformFormatProvider {
+
+  @Override
+  public String identifier() {
+    return FileWriteSchemaTransformFormatProviders.CSV;
+  }
 
-  /** Builds a {@link PTransform} that consumes {@link PCollection}. */
-  PTransform<PCollection<?>, PDone> buildTransform();
+  @Override
+  public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+      FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+    // TODO(https://github.com/apache/beam/issues/24469)
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
similarity index 77%
rename from sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
rename to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
index 10af9b99430..160d51dd293 100644
--- a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.io.fileschematransform;
 
 import com.google.auto.value.AutoValue;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -46,22 +48,22 @@ public abstract class FileWriteSchemaTransformConfiguration {
   }
 
   public static XmlConfiguration.Builder xmlConfigurationBuilder() {
-    return new AutoValue_FileWriteSchemaTransformConfiguration_XmlConfiguration.Builder();
+    return new AutoValue_FileWriteSchemaTransformConfiguration_XmlConfiguration.Builder()
+        .setCharset(StandardCharsets.UTF_8.name());
   }
 
   /**
-   * The format of the file content. Used as {@link String} key lookup of {@link
+   * The format of the file content. Used as String key lookup of {@link
    * FileWriteSchemaTransformFormatProviders#loadProviders()}.
    */
   public abstract String getFormat();
 
-  /** Specifies a common prefix to use for all generated filenames. */
+  /** A common prefix to use for all generated filenames. */
   public abstract String getFilenamePrefix();
 
   /**
-   * Specifies to compress all generated shard files by default, append the respective extension to
-   * the filename. See {@link org.apache.beam.sdk.io.Compression} for expected values, though
-   * stringified in all lowercase format.
+   * The compression of all generated shard files. By default, appends the respective extension to
+   * the filename. See {@link org.apache.beam.sdk.io.Compression} for expected values.
    */
   @Nullable
   public abstract String getCompression();
@@ -90,22 +92,20 @@ public abstract class FileWriteSchemaTransformConfiguration {
   @Nullable
   public abstract XmlConfiguration getXmlConfiguration();
 
+  abstract Builder toBuilder();
+
   @AutoValue.Builder
   public abstract static class Builder {
 
-    /**
-     * The format of the file content. Used as {@link String} key lookup of {@link
-     * FileWriteSchemaTransformFormatProviders#loadProviders()}.
-     */
+    /** The format of the file content. See {@link #getFormat()} for more details. */
     public abstract Builder setFormat(String value);
 
-    /** Specifies a common prefix to use for all generated filenames. */
+    /** A common prefix to use for all generated filenames. */
     public abstract Builder setFilenamePrefix(String value);
 
     /**
-     * Specifies to compress all generated shard files by default, append the respective extension
-     * to the filename. See {@link org.apache.beam.sdk.io.Compression} for expected values, though
-     * stringified in all lowercase format.
+     * The file {@link org.apache.beam.sdk.io.Compression} See {@link #getCompression()} for more
+     * details.
      */
     public abstract Builder setCompression(String value);
 
@@ -135,23 +135,10 @@ public abstract class FileWriteSchemaTransformConfiguration {
   @AutoValue
   public abstract static class CsvConfiguration {
 
-    /**
-     * Not to be confused with the CSV header, it is content written to the top of every sharded
-     * file prior to the header. In the example below, all the text proceeding the header
-     * 'column1,column2,column3' is the preamble.
-     *
-     * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator:
-     * John Doe
-     *
-     * <p>column1,column2,column3 1,2,3 4,5,6
-     */
-    @Nullable
-    public abstract String getPreamble();
-
     /**
      * The format of the written CSV file. See <a
      * href="https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html">org.apache.commons.csv.CSVFormat</a>
-     * for allowed values, stringified in lowercase. Defaults to <a
+     * for allowed values. Defaults to <a
      * href="https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html#DEFAULT">CSVFormat.DEFAULT</a>
      */
     public abstract String getCsvFormat();
@@ -159,22 +146,10 @@ public abstract class FileWriteSchemaTransformConfiguration {
     @AutoValue.Builder
     public abstract static class Builder {
 
-      /**
-       * Not to be confused with the CSV header, it is content written to the top of every sharded
-       * file prior to the header. In the example below, all the text proceeding the header
-       * 'column1,column2,column3' is the preamble.
-       *
-       * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator:
-       * John Doe
-       *
-       * <p>column1,column2,column3 1,2,3 4,5,6
-       */
-      public abstract Builder setPreamble(String value);
-
       /**
        * The format of the written CSV file. See <a
        * href="https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html">org.apache.commons.csv.CSVFormat</a>
-       * for allowed values, stringified in lowercase. Defaults to <a
+       * for allowed values. Defaults to <a
        * href="https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html#DEFAULT">CSVFormat.DEFAULT</a>
        */
       public abstract Builder setCsvFormat(String value);
@@ -190,11 +165,12 @@ public abstract class FileWriteSchemaTransformConfiguration {
 
     /**
      * Specifies compression codec. See org.apache.parquet.hadoop.metadata.CompressionCodecName for
-     * allowed names, stringified in lowercase format.
+     * allowed names.
      */
     public abstract String getCompressionCodecName();
 
     /** Specify row-group size; if not set or zero, a default is used by the underlying writer. */
+    @Nullable
     public abstract Integer getRowGroupSize();
 
     @AutoValue.Builder
@@ -202,7 +178,7 @@ public abstract class FileWriteSchemaTransformConfiguration {
 
       /**
        * Specifies compression codec. See org.apache.parquet.hadoop.metadata.CompressionCodecName
-       * for allowed names, stringified in lowercase format.
+       * for allowed names.
        */
       public abstract Builder setCompressionCodecName(String value);
 
@@ -223,7 +199,7 @@ public abstract class FileWriteSchemaTransformConfiguration {
 
     /**
      * The charset used to write the file. Defaults to {@link
-     * java.nio.charset.StandardCharsets#UTF_8}.
+     * java.nio.charset.StandardCharsets#UTF_8}'s {@link Charset#name()}.
      */
     public abstract String getCharset();
 
@@ -235,7 +211,7 @@ public abstract class FileWriteSchemaTransformConfiguration {
 
       /**
        * The charset used to write the file. Defaults to {@link
-       * java.nio.charset.StandardCharsets#UTF_8}.
+       * java.nio.charset.StandardCharsets#UTF_8}'s {@link Charset#name()}.
        */
       public abstract Builder setCharset(String value);
 
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
similarity index 66%
rename from sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
rename to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
index a6db5ffabe9..6eaec310cd2 100644
--- a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
@@ -18,20 +18,26 @@
 package org.apache.beam.sdk.io.fileschematransform;
 
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.io.Providers;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
 
 /**
- * Provides a {@link PTransform} that consumes a {@link PCollection} according to a registered
- * {@link com.google.auto.service.AutoService} {@link FileWriteSchemaTransformFormatProvider}
+ * Provides a {@link PTransform} that writes a {@link PCollection} of {@link Row}s and outputs a
+ * {@link PCollection} of the file names according to a registered {@link
+ * com.google.auto.service.AutoService} {@link FileWriteSchemaTransformFormatProvider}
  * implementation. See {@link FileWriteSchemaTransformFormatProviders} for a list of available
  * formats.
  */
 @Internal
 public interface FileWriteSchemaTransformFormatProvider extends Providers.Identifyable {
 
-  /** Builds a {@link PTransform} that consumes {@link PCollection}. */
-  PTransform<PCollection<?>, PDone> buildTransform();
+  /**
+   * Builds a {@link PTransform} that writes a {@link Row} {@link PCollection} and outputs the
+   * resulting {@link PCollection} of the file names.
+   */
+  PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+      FileWriteSchemaTransformConfiguration configuration, Schema schema);
 }
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java
new file mode 100644
index 00000000000..8ed73a6b11d
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/**
+ * {@link FileWriteSchemaTransformFormatProviders} contains {@link
+ * FileWriteSchemaTransformFormatProvider} implementations.
+ *
+ * <p>The design goals of this class are to enable clean {@link
+ * FileWriteSchemaTransformConfiguration#getFormat()} lookups mapping to the appropriate {@link
+ * org.apache.beam.sdk.io.FileIO.Write} that encodes the file data into the configured format.
+ */
+@Internal
+public final class FileWriteSchemaTransformFormatProviders {
+  static final String AVRO = "avro";
+  static final String CSV = "csv";
+  static final String JSON = "json";
+  static final String PARQUET = "parquet";
+  static final String XML = "xml";
+
+  /** Load all {@link FileWriteSchemaTransformFormatProvider} implementations. */
+  public static Map<String, FileWriteSchemaTransformFormatProvider> loadProviders() {
+    return Providers.loadProviders(FileWriteSchemaTransformFormatProvider.class);
+  }
+
+  /** Builds a {@link MapElements}i transform to map {@link Row}s to {@link GenericRecord}s. */
+  static MapElements<Row, GenericRecord> mapRowsToGenericRecords(Schema beamSchema) {
+    return MapElements.into(TypeDescriptor.of(GenericRecord.class))
+        .via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
+  }
+
+  /**
+   * Applies common parameters from {@link FileWriteSchemaTransformConfiguration} to {@link
+   * FileIO.Write}.
+   */
+  static <T> FileIO.Write<Void, T> applyCommonFileIOWriteFeatures(
+      FileIO.Write<Void, T> write, FileWriteSchemaTransformConfiguration configuration) {
+
+    if (!Strings.isNullOrEmpty(configuration.getFilenameSuffix())) {
+      write = write.withSuffix(getFilenameSuffix(configuration));
+    }
+
+    if (configuration.getNumShards() != null) {
+      int numShards = getNumShards(configuration);
+      // Python SDK external transforms do not support null values requiring additional check.
+      if (numShards > 0) {
+        write = write.withNumShards(numShards);
+      }
+    }
+
+    if (!Strings.isNullOrEmpty(configuration.getCompression())) {
+      write = write.withCompression(getCompression(configuration));
+    }
+
+    return write;
+  }
+
+  /**
+   * Applies common parameters from {@link FileWriteSchemaTransformConfiguration} to {@link
+   * TextIO.Write}.
+   */
+  static TextIO.Write applyCommonTextIOWriteFeatures(
+      TextIO.Write write, FileWriteSchemaTransformConfiguration configuration) {
+    write = write.to(configuration.getFilenamePrefix());
+
+    if (!Strings.isNullOrEmpty(configuration.getFilenameSuffix())) {
+      write = write.withSuffix(getFilenameSuffix(configuration));
+    }
+
+    if (!Strings.isNullOrEmpty(configuration.getCompression())) {
+      write = write.withCompression(getCompression(configuration));
+    }
+
+    if (configuration.getNumShards() != null) {
+      int numShards = getNumShards(configuration);
+      // Python SDK external transforms do not support null values requiring additional check.
+      if (numShards > 0) {
+        write = write.withNumShards(numShards);
+      }
+    }
+
+    if (!Strings.isNullOrEmpty(configuration.getShardNameTemplate())) {
+      write = write.withShardNameTemplate(getShardNameTemplate(configuration));
+    }
+
+    return write;
+  }
+
+  private static Compression getCompression(FileWriteSchemaTransformConfiguration configuration) {
+    // resolves Checker Framework incompatible argument for valueOf parameter
+    Optional<String> compression = Optional.ofNullable(configuration.getCompression());
+    checkState(compression.isPresent());
+    return Compression.valueOf(compression.get());
+  }
+
+  private static String getFilenameSuffix(FileWriteSchemaTransformConfiguration configuration) {
+    // resolves Checker Framework incompatible argument for parameter suffix of withSuffix
+    Optional<String> suffix = Optional.ofNullable(configuration.getFilenameSuffix());
+    checkState(suffix.isPresent());
+    return suffix.get();
+  }
+
+  static Integer getNumShards(FileWriteSchemaTransformConfiguration configuration) {
+    // resolves Checker Framework unboxing a possibly-null reference
+    Optional<Integer> numShards = Optional.ofNullable(configuration.getNumShards());
+    checkState(numShards.isPresent());
+    return numShards.get();
+  }
+
+  static String getShardNameTemplate(FileWriteSchemaTransformConfiguration configuration) {
+    // resolves Checker Framework incompatible null argument for parameter shardTemplate
+    Optional<String> shardNameTemplate = Optional.ofNullable(configuration.getShardNameTemplate());
+    checkState(shardNameTemplate.isPresent());
+    return shardNameTemplate.get();
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..ced21527a2f
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.AVRO;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.CSV;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.PARQUET;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/**
+ * A {@link TypedSchemaTransformProvider} implementation for writing a {@link Row} {@link
+ * PCollection} to file systems, driven by a {@link FileWriteSchemaTransformConfiguration}.
+ */
+public class FileWriteSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileWriteSchemaTransformConfiguration> {
+
+  public static final Field FILE_NAME_FIELD = Field.of("fileName", FieldType.STRING);
+  public static final Schema OUTPUT_SCHEMA = Schema.of(FILE_NAME_FIELD);
+
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_write:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  /** Provides the required {@link TypedSchemaTransformProvider#configurationClass()}. */
+  @Override
+  protected Class<FileWriteSchemaTransformConfiguration> configurationClass() {
+    return FileWriteSchemaTransformConfiguration.class;
+  }
+
+  /** Builds a {@link SchemaTransform} from a {@link FileWriteSchemaTransformConfiguration}. */
+  @Override
+  protected SchemaTransform from(FileWriteSchemaTransformConfiguration configuration) {
+    return new FileWriteSchemaTransform(configuration);
+  }
+
+  /** Returns the {@link TypedSchemaTransformProvider#identifier()} required for registration. */
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  /** The expected {@link PCollectionRowTuple} input tags. */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /** The expected {@link PCollectionRowTuple} output tags. */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * A {@link PTransform} that converts a {@link PCollectionRowTuple} of {@link
+   * #inputCollectionNames()} tagged {@link Row}s into a {@link PCollectionRowTuple} of {@link
+   * #outputCollectionNames()} tagged {@link Row}s.
+   */
+  static class FileWriteSchemaTransform extends PTransform<PCollectionRowTuple, PCollectionRowTuple>
+      implements SchemaTransform {
+
+    final FileWriteSchemaTransformConfiguration configuration;
+
+    FileWriteSchemaTransform(FileWriteSchemaTransformConfiguration configuration) {
+      validateConfiguration(configuration);
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (input.getAll().isEmpty() || input.getAll().size() > 1) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s expects a single %s tagged PCollection<Row> input",
+                FileWriteSchemaTransform.class.getName(), INPUT_TAG));
+      }
+
+      PCollection<Row> rowInput = input.get(INPUT_TAG);
+
+      PTransform<PCollection<Row>, PCollection<String>> transform =
+          getProvider().buildTransform(configuration, rowInput.getSchema());
+
+      PCollection<String> files = rowInput.apply("Write Rows", transform);
+      PCollection<Row> output =
+          files.apply(
+              "Filenames to Rows",
+              MapElements.into(rows())
+                  .via(
+                      (String name) ->
+                          Row.withSchema(OUTPUT_SCHEMA)
+                              .withFieldValue(FILE_NAME_FIELD.getName(), name)
+                              .build()));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return this;
+    }
+
+    /**
+     * A helper method to retrieve the mapped {@link FileWriteSchemaTransformFormatProvider} from a
+     * {@link FileWriteSchemaTransformConfiguration#getFormat()}.
+     */
+    FileWriteSchemaTransformFormatProvider getProvider() {
+      Map<String, FileWriteSchemaTransformFormatProvider> providers =
+          FileWriteSchemaTransformFormatProviders.loadProviders();
+      if (!providers.containsKey(configuration.getFormat())) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s is not a supported format. See %s for a list of supported formats.",
+                configuration.getFormat(),
+                FileWriteSchemaTransformFormatProviders.class.getName()));
+      }
+      // resolves [dereference.of.nullable]
+      Optional<FileWriteSchemaTransformFormatProvider> provider =
+          Optional.ofNullable(providers.get(configuration.getFormat()));
+      checkState(provider.isPresent());
+      return provider.get();
+    }
+
+    /**
+     * Validates a {@link FileWriteSchemaTransformConfiguration} for correctness depending on its
+     * {@link FileWriteSchemaTransformConfiguration#getFormat()}.
+     */
+    static void validateConfiguration(FileWriteSchemaTransformConfiguration configuration) {
+      String format = configuration.getFormat();
+      if (configuration.getCsvConfiguration() != null && !format.equals(CSV)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "configuration with %s is not compatible with a %s format",
+                FileWriteSchemaTransformConfiguration.CsvConfiguration.class.getName(), format));
+      }
+      if (configuration.getParquetConfiguration() != null && !format.equals(PARQUET)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "configuration with %s is not compatible with a %s format",
+                FileWriteSchemaTransformConfiguration.ParquetConfiguration.class.getName(),
+                format));
+      }
+      if (configuration.getXmlConfiguration() != null && !format.equals(XML)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "configuration with %s is not compatible with a %s format",
+                FileWriteSchemaTransformConfiguration.XmlConfiguration.class.getName(), format));
+      }
+      if (format.equals(AVRO) && !Strings.isNullOrEmpty(configuration.getCompression())) {
+        throw new IllegalArgumentException(
+            "configuration with compression is not compatible with AvroIO");
+      }
+      if (format.equals(PARQUET) && !Strings.isNullOrEmpty(configuration.getCompression())) {
+        throw new IllegalArgumentException(
+            "configuration with compression is not compatible with ParquetIO");
+      }
+    }
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonWriteSchemaTransformFormatProvider.java
new file mode 100644
index 00000000000..68f12e3d26b
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonWriteSchemaTransformFormatProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.applyCommonTextIOWriteFeatures;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+
+import com.google.auto.service.AutoService;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link FileWriteSchemaTransformFormatProvider} for JSON format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class JsonWriteSchemaTransformFormatProvider
+    implements FileWriteSchemaTransformFormatProvider {
+
+  final String suffix = String.format(".%s", FileWriteSchemaTransformFormatProviders.JSON);
+
+  @Override
+  public String identifier() {
+    return FileWriteSchemaTransformFormatProviders.JSON;
+  }
+
+  /**
+   * Builds a {@link PTransform} that transforms a {@link Row} {@link PCollection} into result
+   * {@link PCollection} file names written using {@link TextIO.Write}.
+   */
+  @Override
+  public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+      FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+    return new PTransform<PCollection<Row>, PCollection<String>>() {
+      @Override
+      public PCollection<String> expand(PCollection<Row> input) {
+
+        PCollection<String> json = input.apply("Row To Json", mapRowsToJsonStrings(schema));
+
+        TextIO.Write write =
+            TextIO.write().to(configuration.getFilenamePrefix()).withSuffix(suffix);
+
+        write = applyCommonTextIOWriteFeatures(write, configuration);
+
+        return json.apply("Write Json", write.withOutputFilenames())
+            .getPerDestinationOutputFilenames()
+            .apply("perDestinationOutputFilenames", Values.create());
+      }
+    };
+  }
+
+  /** Builds a {@link MapElements} transform to map {@link Row} to JSON strings. */
+  MapElements<Row, String> mapRowsToJsonStrings(Schema schema) {
+    return MapElements.into(strings()).via(new RowToJsonFn(schema));
+  }
+
+  private static class RowToJsonFn implements SerializableFunction<Row, String> {
+
+    private final PayloadSerializer payloadSerializer;
+
+    RowToJsonFn(Schema schema) {
+      payloadSerializer =
+          new JsonPayloadSerializerProvider().getSerializer(schema, ImmutableMap.of());
+    }
+
+    @Override
+    public String apply(Row input) {
+      return new String(payloadSerializer.serialize(input), StandardCharsets.UTF_8);
+    }
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetWriteSchemaTransformFormatProvider.java
new file mode 100644
index 00000000000..6981844d229
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetWriteSchemaTransformFormatProvider.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.applyCommonFileIOWriteFeatures;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.service.AutoService;
+import java.util.Optional;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.ParquetConfiguration;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/** A {@link FileWriteSchemaTransformFormatProvider} for Parquet format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class ParquetWriteSchemaTransformFormatProvider
+    implements FileWriteSchemaTransformFormatProvider {
+
+  private static final String SUFFIX =
+      String.format(".%s", FileWriteSchemaTransformFormatProviders.PARQUET);
+
+  @Override
+  public String identifier() {
+    return FileWriteSchemaTransformFormatProviders.PARQUET;
+  }
+
+  /**
+   * Builds a {@link PTransform} that transforms a {@link Row} {@link PCollection} into result
+   * {@link PCollection} file names written using {@link ParquetIO.Sink} and {@link FileIO.Write}.
+   */
+  @Override
+  public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+      FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+    return new PTransform<PCollection<Row>, PCollection<String>>() {
+      @Override
+      public PCollection<String> expand(PCollection<Row> input) {
+        org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+        AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
+
+        FileIO.Write<Void, GenericRecord> write =
+            FileIO.<GenericRecord>write()
+                .to(configuration.getFilenamePrefix())
+                .via(buildSink(parquetConfiguration(configuration), schema))
+                .withSuffix(SUFFIX);
+
+        write = applyCommonFileIOWriteFeatures(write, configuration);
+
+        return input
+            .apply(
+                "Row To GenericRecord",
+                FileWriteSchemaTransformFormatProviders.mapRowsToGenericRecords(schema))
+            .setCoder(coder)
+            .apply("Write Parquet", write)
+            .getPerDestinationOutputFilenames()
+            .apply("perDestinationOutputFilenames", Values.create());
+      }
+    };
+  }
+
+  private ParquetIO.Sink buildSink(ParquetConfiguration configuration, Schema schema) {
+
+    ParquetIO.Sink sink =
+        ParquetIO.sink(AvroUtils.toAvroSchema(schema))
+            .withCompressionCodec(
+                CompressionCodecName.valueOf(configuration.getCompressionCodecName()));
+
+    if (configuration.getRowGroupSize() != null) {
+      int rowGroupSize = getRowGroupSize(configuration);
+      // Python SDK external transforms do not support null values requiring additional check.
+      if (rowGroupSize > 0) {
+        sink = sink.withRowGroupSize(rowGroupSize);
+      }
+    }
+
+    return sink;
+  }
+
+  private static ParquetConfiguration parquetConfiguration(
+      FileWriteSchemaTransformConfiguration configuration) {
+    // resolves Checker Framework incompatible argument for requireNonNull parameter
+    Optional<ParquetConfiguration> parquetConfiguration =
+        Optional.ofNullable(configuration.getParquetConfiguration());
+    checkState(parquetConfiguration.isPresent());
+    return parquetConfiguration.get();
+  }
+
+  private static Integer getRowGroupSize(ParquetConfiguration configuration) {
+    // resolves Checker Framework [unboxing.of.nullable] unboxing a possibly-null reference
+    Optional<Integer> rowGroupSize = Optional.ofNullable(configuration.getRowGroupSize());
+    checkState(rowGroupSize.isPresent());
+    return rowGroupSize.get();
+  }
+}
diff --git a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapter.java
similarity index 51%
copy from sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
copy to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapter.java
index 6081e783ec2..77457bc8307 100644
--- a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapter.java
@@ -17,27 +17,25 @@
  */
 package org.apache.beam.sdk.io.fileschematransform;
 
-import static org.junit.Assert.assertTrue;
+import java.io.Serializable;
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
 
-import java.util.Map;
-import java.util.Set;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+/** An {@link XmlAdapter} for {@link DateTime}s. */
+class XmlDateTimeAdapter extends XmlAdapter<String, DateTime> implements Serializable {
+  private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.dateTime();
 
-/** Tests for {@link FileWriteSchemaTransformFormatProviders}. */
-@RunWith(JUnit4.class)
-public class FileWriteSchemaTransformFormatProvidersTest {
+  /** Converts a String into {@link DateTime} based on {@link ISODateTimeFormat}. */
+  @Override
+  public DateTime unmarshal(String v) throws Exception {
+    return DateTime.parse(v, FORMATTER);
+  }
 
-  @Test
-  public void testLoadProviders() {
-    Map<String, FileWriteSchemaTransformFormatProvider> formatProviderMap =
-        FileWriteSchemaTransformFormatProviders.loadProviders();
-    Set<String> keys = formatProviderMap.keySet();
-    assertTrue(keys.contains("avro"));
-    assertTrue(keys.contains("csv"));
-    assertTrue(keys.contains("json"));
-    assertTrue(keys.contains("parquet"));
-    assertTrue(keys.contains("xml"));
+  /** Converts a {@link DateTime} into String based on {@link ISODateTimeFormat}. */
+  @Override
+  public String marshal(DateTime v) throws Exception {
+    return v.toString(FORMATTER);
   }
 }
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapter.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapter.java
new file mode 100644
index 00000000000..0b5d859dadf
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import java.io.Serializable;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * Wraps a {@link Row} for compatible use with {@link javax.xml.bind.JAXBContext}. {@link
+ * XmlRowAdapter} allows {@link XmlWriteSchemaTransformFormatProvider} to convert {@link Row} to XML
+ * strings with no knowledge of the original Java class. {@link javax.xml.bind.Marshaller} requires
+ * Java classes to be annotated with {@link XmlRootElement}, preventing the {@link
+ * javax.xml.bind.Marshaller#marshal(Object, Writer)} of {@link Row}s directly. {@link
+ * XmlRowAdapter} exposes the String key and Object value pairs of the {@link Row} to the {@link
+ * javax.xml.bind.Marshaller}.
+ */
+@XmlRootElement(name = "row")
+@XmlAccessorType(XmlAccessType.PROPERTY)
+class XmlRowAdapter implements Serializable {
+
+  private final HashMap<String, XmlRowValue> record = new HashMap<>();
+
+  /**
+   * Wrap a {@link Row} to prepare {@link XmlRowAdapter}'s use with {@link
+   * javax.xml.bind.Marshaller}. {@link XmlRowAdapter} stores a {@link HashMap} that this method
+   * fills from {@link Row}'s String key and Object value pairs. This copying of data to the {@link
+   * HashMap} allows {@link javax.xml.bind.Marshaller} to populate the XML elements from {@link
+   * #getData()}.
+   */
+  void wrapRow(Row row) {
+    Schema schema = row.getSchema();
+    for (String key : schema.getFieldNames()) {
+      XmlRowValue value = new XmlRowValue();
+      value.setValue(key, row);
+      record.put(key, value);
+    }
+  }
+
+  /**
+   * Exposes the copied {@link Row} data to the {@link javax.xml.bind.Marshaller} via the {@link
+   * XmlElement} annotation.
+   */
+  @XmlElement
+  HashMap<String, XmlRowValue> getData() {
+    return record;
+  }
+
+  @Override
+  public boolean equals(@Nullable Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    XmlRowAdapter that = (XmlRowAdapter) o;
+    return record.equals(that.record);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(record);
+  }
+
+  @Override
+  public String toString() {
+    return "XmlRowAdapter{" + "record=" + record + '}';
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValue.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValue.java
new file mode 100644
index 00000000000..df4392665a4
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValue.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+
+/**
+ * Implements an {@link XmlType} of {@link Row} values for compatible use with {@link
+ * javax.xml.bind.JAXBContext}. {@link XmlRowValue} allows {@link
+ * XmlWriteSchemaTransformFormatProvider} to convert {@link Row} values to XML strings with no
+ * knowledge of the original Java class. {@link #setValue(String, Row)} serves as the algorithm's
+ * entry point.
+ */
+@XmlType
+class XmlRowValue implements Serializable {
+  @Nullable private Object primitiveValue = null;
+
+  @Nullable private ArrayList<XmlRowValue> valueList = null;
+
+  @Nullable private DateTime dateTimeValue = null;
+
+  @Nullable private HashMap<String, XmlRowValue> nestedValue = null;
+
+  /**
+   * A {@link Row}'s value for a primitive type such as {@link FieldType#STRING}, {@link
+   * FieldType#DOUBLE}, etc.
+   */
+  @XmlElement(name = "value")
+  @Nullable
+  Object getPrimitiveValue() {
+    return primitiveValue;
+  }
+
+  /**
+   * A {@link Row}'s value for a {@link FieldType#DATETIME}, converted using {@link
+   * XmlDateTimeAdapter}.
+   */
+  @XmlElement(name = "value")
+  @XmlJavaTypeAdapter(XmlDateTimeAdapter.class)
+  @Nullable
+  DateTime getDateTimeValue() {
+    return dateTimeValue;
+  }
+
+  /** A {@link Row}'s value for a {@link TypeName#ARRAY} or {@link TypeName#ITERABLE} type. */
+  @XmlElement(name = "array")
+  @Nullable
+  ArrayList<XmlRowValue> getValueList() {
+    return valueList;
+  }
+
+  /** A {@link Row}'s value for a nested {@link TypeName#ROW} value. */
+  @XmlElement(name = "row")
+  @Nullable
+  HashMap<String, XmlRowValue> getNestedValue() {
+    return nestedValue;
+  }
+
+  void setPrimitiveValue(Object primitiveValue) {
+    this.primitiveValue = primitiveValue;
+  }
+
+  void setValueList(ArrayList<XmlRowValue> valueList) {
+    this.valueList = valueList;
+  }
+
+  /**
+   * The entry point for parsing a {@link Row} record and its value mapped from the key. Primitive
+   * types populate {@link #setPrimitiveValue(Object)}. {@link FieldType#DATETIME} values populate
+   * {@link #setDateTimeValue(ReadableDateTime)}. {@link TypeName#ARRAY} or {@link
+   * TypeName#ITERABLE} values populate {@link #setArrayValue(String, Field, Row)} and {@link
+   * TypeName#ROW} nested values populate {@link #setNestedValue(Row)}.
+   */
+  void setValue(String key, Row parent) {
+    Schema schema = parent.getSchema();
+    Field field = schema.getField(key);
+    FieldType fieldType = field.getType();
+    TypeName typeName = fieldType.getTypeName();
+    switch (typeName) {
+      case BOOLEAN:
+      case BYTE:
+      case DECIMAL:
+      case DOUBLE:
+      case FLOAT:
+      case INT16:
+      case INT32:
+      case INT64:
+      case STRING:
+        primitiveValue = parent.getValue(key);
+        return;
+      case ARRAY:
+      case ITERABLE:
+        setArrayValue(key, field, parent);
+        return;
+      case DATETIME:
+        setDateTimeValue(key, parent);
+        return;
+      case ROW:
+        Optional<Row> child = Optional.ofNullable(parent.getRow(key));
+        checkState(child.isPresent());
+        setNestedValue(child.get());
+        return;
+      default:
+        throw new IllegalArgumentException(
+            String.format("%s at key %s is not supported", typeName.name(), key));
+    }
+  }
+
+  private void setArrayValue(String key, Field field, Row parent) {
+    Optional<FieldType> collectionFieldType =
+        Optional.ofNullable(field.getType().getCollectionElementType());
+    checkState(collectionFieldType.isPresent());
+    TypeName collectionFieldTypeName = collectionFieldType.get().getTypeName();
+    Optional<Iterable<Object>> iterable = Optional.ofNullable(parent.getIterable(key));
+    checkState(iterable.isPresent());
+    valueList = new ArrayList<>();
+    Optional<ArrayList<XmlRowValue>> safeValueList = Optional.of(valueList);
+
+    switch (collectionFieldTypeName) {
+      case BOOLEAN:
+      case BYTE:
+      case DECIMAL:
+      case DOUBLE:
+      case FLOAT:
+      case INT16:
+      case INT32:
+      case INT64:
+      case STRING:
+        for (Object element : iterable.get()) {
+          XmlRowValue value = new XmlRowValue();
+          value.setPrimitiveValue(element);
+          safeValueList.get().add(value);
+        }
+        return;
+      case DATETIME:
+        for (Object element : iterable.get()) {
+          XmlRowValue value = new XmlRowValue();
+          value.setDateTimeValue(((Instant) element).toDateTime());
+          safeValueList.get().add(value);
+        }
+        return;
+      case ROW:
+        for (Object value : iterable.get()) {
+          Row nestedValue = (Row) value;
+          XmlRowValue element = new XmlRowValue();
+          element.setNestedValue(nestedValue);
+          safeValueList.get().add(element);
+        }
+        return;
+      default:
+        throw new IllegalArgumentException(
+            String.format("%s at key %s is not supported", collectionFieldType, key));
+    }
+  }
+
+  private void setDateTimeValue(String key, Row parent) {
+    Optional<ReadableDateTime> value = Optional.ofNullable(parent.getDateTime(key));
+    checkState(value.isPresent());
+    setDateTimeValue(value.get());
+  }
+
+  private void setDateTimeValue(ReadableDateTime value) {
+    dateTimeValue = Instant.ofEpochMilli(value.getMillis()).toDateTime();
+  }
+
+  private void setNestedValue(Row row) {
+    Schema schema = row.getSchema();
+    nestedValue = new HashMap<>();
+    Optional<HashMap<String, XmlRowValue>> safeNestedValue = Optional.of(nestedValue);
+    for (String key : schema.getFieldNames()) {
+      XmlRowValue child = new XmlRowValue();
+      child.setValue(key, row);
+      safeNestedValue.get().put(key, child);
+    }
+  }
+
+  @Override
+  public boolean equals(@Nullable Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    XmlRowValue that = (XmlRowValue) o;
+
+    Optional<Object> thatPrimitiveValue = Optional.ofNullable(that.getPrimitiveValue());
+    Optional<Object> primitiveValue = Optional.ofNullable(getPrimitiveValue());
+
+    Optional<DateTime> thatPrimitiveDateTime = Optional.ofNullable(that.getDateTimeValue());
+    Optional<DateTime> dateTime = Optional.ofNullable(getDateTimeValue());
+
+    Optional<ArrayList<XmlRowValue>> thatValueList = Optional.ofNullable(that.getValueList());
+    Optional<ArrayList<XmlRowValue>> valueList = Optional.ofNullable(getValueList());
+
+    Optional<HashMap<String, XmlRowValue>> thatNestedValue =
+        Optional.ofNullable(that.getNestedValue());
+    Optional<HashMap<String, XmlRowValue>> nestedValue = Optional.ofNullable(getNestedValue());
+
+    return equals(primitiveValue, thatPrimitiveValue)
+        && equals(dateTime, thatPrimitiveDateTime)
+        && equals(valueList, thatValueList)
+        && equals(nestedValue, thatNestedValue);
+  }
+
+  private static <T> boolean equals(Optional<T> a, Optional<T> b) {
+    if (a.isPresent() && b.isPresent()) {
+      return a.get().equals(b.get());
+    }
+    if (!a.isPresent() && !b.isPresent()) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    // resolves dereference of possibly-null reference
+    Optional<Object> primitive = Optional.ofNullable(getPrimitiveValue());
+    Optional<ArrayList<XmlRowValue>> list = Optional.ofNullable(getValueList());
+    Optional<DateTime> dateTime = Optional.ofNullable(getDateTimeValue());
+    Optional<HashMap<String, XmlRowValue>> nested = Optional.ofNullable(getNestedValue());
+
+    int result = primitive.hashCode();
+    result = 31 * result + list.hashCode();
+    result = 31 * result + dateTime.hashCode();
+    result = 31 * result + nested.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "XmlRowValue{" + "primitiveValue=" + primitiveValue + '}';
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlWriteSchemaTransformFormatProvider.java
new file mode 100644
index 00000000000..7c8f4ff0c84
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlWriteSchemaTransformFormatProvider.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.applyCommonFileIOWriteFeatures;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.service.AutoService;
+import java.nio.charset.Charset;
+import java.util.Optional;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.XmlConfiguration;
+import org.apache.beam.sdk.io.xml.XmlIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/** A {@link FileWriteSchemaTransformFormatProvider} for XML format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class XmlWriteSchemaTransformFormatProvider
+    implements FileWriteSchemaTransformFormatProvider {
+
+  private static final String SUFFIX = String.format(".%s", XML);
+
+  @Override
+  public String identifier() {
+    return XML;
+  }
+
+  /**
+   * Builds a {@link PTransform} that transforms a {@link Row} {@link PCollection} into result
+   * {@link PCollection} file names written using {@link XmlIO.Sink} and {@link FileIO.Write}.
+   */
+  @Override
+  public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+      FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+    return new PTransform<PCollection<Row>, PCollection<String>>() {
+      @Override
+      public PCollection<String> expand(PCollection<Row> input) {
+
+        PCollection<XmlRowAdapter> xml =
+            input.apply(
+                "Row to XML",
+                MapElements.into(TypeDescriptor.of(XmlRowAdapter.class)).via(new RowToXmlFn()));
+
+        XmlConfiguration xmlConfig = xmlConfiguration(configuration);
+
+        checkArgument(!Strings.isNullOrEmpty(xmlConfig.getCharset()), "charset must be specified");
+        checkArgument(
+            !Strings.isNullOrEmpty(xmlConfig.getRootElement()), "rootElement must be specified");
+
+        Charset charset = Charset.forName(xmlConfig.getCharset());
+
+        XmlIO.Sink<XmlRowAdapter> sink =
+            XmlIO.sink(XmlRowAdapter.class)
+                .withCharset(charset)
+                .withRootElement(xmlConfig.getRootElement());
+
+        FileIO.Write<Void, XmlRowAdapter> write =
+            FileIO.<XmlRowAdapter>write()
+                .to(configuration.getFilenamePrefix())
+                .via(sink)
+                .withSuffix(SUFFIX);
+
+        write = applyCommonFileIOWriteFeatures(write, configuration);
+
+        return xml.apply("Write XML", write)
+            .getPerDestinationOutputFilenames()
+            .apply("perDestinationOutputFilenames", Values.create());
+      }
+    };
+  }
+
+  /** A {@link SerializableFunction} for converting {@link Row}s to {@link XmlRowAdapter}s. */
+  static class RowToXmlFn implements SerializableFunction<Row, XmlRowAdapter> {
+
+    @Override
+    public XmlRowAdapter apply(Row input) {
+      XmlRowAdapter result = new XmlRowAdapter();
+      result.wrapRow(input);
+      return result;
+    }
+  }
+
+  private static XmlConfiguration xmlConfiguration(
+      FileWriteSchemaTransformConfiguration configuration) {
+    // resolves Checker Framework incompatible argument for parameter of requireNonNull
+    Optional<XmlConfiguration> result = Optional.ofNullable(configuration.getXmlConfiguration());
+    checkState(result.isPresent());
+    return result.get();
+  }
+}
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/package-info.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/package-info.java
similarity index 100%
rename from sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/package-info.java
rename to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/package-info.java
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroFileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroFileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..466ea475d31
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroFileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.AVRO;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link AvroWriteSchemaTransformFormatProvider}. */
+@RunWith(JUnit4.class)
+public class AvroFileWriteSchemaTransformFormatProviderTest
+    extends FileWriteSchemaTransformFormatProviderTest {
+  @Override
+  protected String getFormat() {
+    return AVRO;
+  }
+
+  @Override
+  protected String getFilenamePrefix() {
+    return "/";
+  }
+
+  @Override
+  protected void assertFolderContainsInAnyOrder(String folder, List<Row> rows, Schema beamSchema) {
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+    AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
+    List<GenericRecord> expected =
+        rows.stream()
+            .map(AvroUtils.getRowToGenericRecordFunction(avroSchema)::apply)
+            .collect(Collectors.toList());
+
+    PCollection<GenericRecord> actual =
+        readPipeline
+            .apply(AvroIO.readGenericRecords(avroSchema).from(folder + getFilenamePrefix() + "*"))
+            .setCoder(coder);
+
+    PAssert.that(actual).containsInAnyOrder(expected);
+  }
+
+  @Override
+  protected FileWriteSchemaTransformConfiguration buildConfiguration(String folder) {
+    return defaultConfiguration(folder);
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenCompressionSet() {
+    return Optional.of("configuration with compression is not compatible with AvroIO");
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenParquetConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$ParquetConfiguration is not compatible with a avro format");
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenXmlConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$XmlConfiguration is not compatible with a avro format");
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenNumShardsSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenShardNameTemplateSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenCsvConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$CsvConfiguration is not compatible with a avro format");
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..015ce5245fe
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.DOUBLY_NESTED_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SINGLY_NESTED_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.csvConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.parquetConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.xmlConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.loadProviders;
+import static org.apache.beam.sdk.values.TypeDescriptors.booleans;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.AllPrimitiveDataTypes;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Base class for tests of {@link FileWriteSchemaTransformFormatProvider} implementations. */
+abstract class FileWriteSchemaTransformFormatProviderTest {
+
+  /**
+   * The {@link FileWriteSchemaTransformConfiguration#getFormat()} mapped to this {@link
+   * FileWriteSchemaTransformFormatProvider}.
+   */
+  protected abstract String getFormat();
+
+  /**
+   * The filename prefix of sharded files, required by {@link org.apache.beam.sdk.io.TextIO.Write}
+   * based {@link FileWriteSchemaTransformFormatProvider}s.
+   */
+  protected abstract String getFilenamePrefix();
+
+  /**
+   * Asserts whether the {@link FileWriteSchemaTransformFormatProvider} wrote expected contents to
+   * {@link FileWriteSchemaTransformConfiguration#getFilenamePrefix()}.
+   */
+  protected abstract void assertFolderContainsInAnyOrder(
+      String folder, List<Row> rows, Schema beamSchema);
+
+  /**
+   * Builds {@link FileWriteSchemaTransformConfiguration} specifying the folder {@link
+   * FileWriteSchemaTransformConfiguration#getFilenamePrefix()}.
+   */
+  protected abstract FileWriteSchemaTransformConfiguration buildConfiguration(String folder);
+
+  /**
+   * The expected error message when {@link FileWriteSchemaTransformConfiguration#getCompression()}
+   * is not null.
+   */
+  protected abstract Optional<String> expectedErrorWhenCompressionSet();
+
+  @Test
+  public void withCompression() {
+    String to = folder(AllPrimitiveDataTypes.class, "with_compression");
+    Compression compression = Compression.GZIP;
+    FileWriteSchemaTransformConfiguration configuration =
+        buildConfiguration(to).toBuilder().setCompression(compression.name()).build();
+
+    FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+
+    if (expectedErrorWhenCompressionSet().isPresent()) {
+      IllegalArgumentException invalidConfiguration =
+          assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+      assertEquals(expectedErrorWhenCompressionSet().get(), invalidConfiguration.getMessage());
+      return;
+    }
+
+    PCollection<String> files =
+        applyProviderAndAssertFilesWritten(
+            DATA.allPrimitiveDataTypesRows, ALL_PRIMITIVE_DATA_TYPES_SCHEMA, configuration);
+
+    PCollection<String> extension =
+        files
+            .apply(
+                "extract extension",
+                MapElements.into(strings())
+                    .via(fullName -> fullName != null ? Files.getFileExtension(fullName) : null))
+            .apply("distinct extensions", Distinct.create());
+
+    PCollection<Boolean> isCompressed =
+        files
+            .apply(
+                "isCompressed",
+                MapElements.into(booleans())
+                    .via(filename -> filename != null && compression.isCompressed(filename)))
+            .apply("distinct isCompressed", Distinct.create());
+
+    PAssert.thatSingleton("Filenames end with compression name", extension).isEqualTo("gz");
+
+    PAssert.thatSingleton("Files should be compressed", isCompressed).isEqualTo(true);
+
+    writePipeline.run();
+  }
+
+  /**
+   * The expected error message when {@link
+   * FileWriteSchemaTransformConfiguration#getParquetConfiguration()} ()} is not null.
+   */
+  protected abstract Optional<String> expectedErrorWhenParquetConfigurationSet();
+
+  @Test
+  public void invalidConfigurationWithParquet() {
+    String to = folder(getFormat(), "configuration_with_parquet");
+    FileWriteSchemaTransformConfiguration configuration =
+        buildConfiguration(to)
+            .toBuilder()
+            .setParquetConfiguration(
+                parquetConfigurationBuilder()
+                    .setCompressionCodecName(CompressionCodecName.GZIP.name())
+                    .build())
+            .build();
+
+    FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+
+    if (!expectedErrorWhenParquetConfigurationSet().isPresent()) {
+      // we do not expect an error
+      provider.from(configuration);
+      return;
+    }
+
+    IllegalArgumentException invalidConfigurationError =
+        assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+
+    assertEquals(
+        expectedErrorWhenParquetConfigurationSet().get(), invalidConfigurationError.getMessage());
+  }
+
+  /**
+   * The expected error message when {@link
+   * FileWriteSchemaTransformConfiguration#getXmlConfiguration()} ()} is not null.
+   */
+  protected abstract Optional<String> expectedErrorWhenXmlConfigurationSet();
+
+  @Test
+  public void invalidConfigurationWithXml() {
+    String to = folder(getFormat(), "configuration_with_xml");
+    FileWriteSchemaTransformConfiguration configuration =
+        buildConfiguration(to)
+            .toBuilder()
+            .setXmlConfiguration(
+                xmlConfigurationBuilder()
+                    .setRootElement("rootElement")
+                    .setCharset(Charset.defaultCharset().name())
+                    .build())
+            .build();
+
+    FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+    if (!expectedErrorWhenXmlConfigurationSet().isPresent()) {
+      // No error expected
+      provider.from(configuration);
+      return;
+    }
+
+    IllegalArgumentException configurationError =
+        assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+
+    assertEquals(expectedErrorWhenXmlConfigurationSet().get(), configurationError.getMessage());
+  }
+
+  /**
+   * The expected error message when {@link FileWriteSchemaTransformConfiguration#getNumShards()}
+   * ()} is not null.
+   */
+  protected abstract Optional<String> expectedErrorWhenNumShardsSet();
+
+  @Test
+  public void numShardsSetConfiguration() {
+    String to = folder(AllPrimitiveDataTypes.class, "num_shards_configuration");
+    int expectedNumShards = 10;
+    FileWriteSchemaTransformConfiguration configuration =
+        buildConfiguration(to).toBuilder().setNumShards(expectedNumShards).build();
+
+    if (expectedErrorWhenNumShardsSet().isPresent()) {
+      FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+      IllegalArgumentException configurationError =
+          assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+      assertEquals(expectedErrorWhenNumShardsSet().get(), configurationError.getMessage());
+      return;
+    }
+
+    List<Row> rows = new ArrayList<>(DATA.allPrimitiveDataTypesRows);
+    for (int i = 0; i < 100; i++) {
+      rows.addAll(DATA.allPrimitiveDataTypesRows);
+    }
+
+    PCollection<String> files =
+        applyProviderAndAssertFilesWritten(rows, ALL_PRIMITIVE_DATA_TYPES_SCHEMA, configuration);
+    PCollection<Long> count = files.apply(Count.globally());
+    PAssert.thatSingleton("Amount of files created should match numShards", count)
+        .isEqualTo(Integer.valueOf(expectedNumShards).longValue());
+
+    writePipeline.run();
+  }
+
+  /**
+   * The expected error message when {@link
+   * FileWriteSchemaTransformConfiguration#getShardNameTemplate()} ()} is not null.
+   */
+  protected abstract Optional<String> expectedErrorWhenShardNameTemplateSet();
+
+  @Test
+  public void shardNameTemplateSetConfiguration() {
+    String to = folder(AllPrimitiveDataTypes.class, "shard_name_template");
+    String shardNameTemplate = "-SS-of-NN";
+    FileWriteSchemaTransformConfiguration configuration =
+        buildConfiguration(to).toBuilder().setShardNameTemplate(shardNameTemplate).build();
+
+    if (expectedErrorWhenShardNameTemplateSet().isPresent()) {
+      FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+      IllegalArgumentException configurationError =
+          assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+      assertEquals(expectedErrorWhenShardNameTemplateSet().get(), configurationError.getMessage());
+      return;
+    }
+
+    PCollection<String> files =
+        applyProviderAndAssertFilesWritten(
+            DATA.allPrimitiveDataTypesRows, ALL_PRIMITIVE_DATA_TYPES_SCHEMA, configuration);
+    PAssert.that("All file names match shard name template", files)
+        .satisfies(
+            (Iterable<String> names) -> {
+              assertNotNull(names);
+              for (String name : names) {
+                assertTrue(name.matches("^.*\\d\\d-of-\\d\\d.*$"));
+              }
+              return null;
+            });
+
+    writePipeline.run();
+  }
+
+  /**
+   * The expected error message when {@link
+   * FileWriteSchemaTransformConfiguration#getCsvConfiguration()} ()} is not null.
+   */
+  protected abstract Optional<String> expectedErrorWhenCsvConfigurationSet();
+
+  @Test
+  public void csvConfigurationSet() {
+    String to = folder(getFormat(), "csv_configuration");
+    FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+    FileWriteSchemaTransformConfiguration configuration =
+        buildConfiguration(to)
+            .toBuilder()
+            .setCsvConfiguration(csvConfigurationBuilder().build())
+            .build();
+    if (!expectedErrorWhenCsvConfigurationSet().isPresent()) {
+      // No error expected
+      provider.from(configuration);
+      return;
+    }
+    IllegalArgumentException configurationError =
+        assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+    assertEquals(expectedErrorWhenCsvConfigurationSet().get(), configurationError.getMessage());
+  }
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void allPrimitiveDataTypes() {
+    String to = folder(SchemaAwareJavaBeans.AllPrimitiveDataTypes.class);
+    Schema schema = ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.allPrimitiveDataTypesRows;
+    applyProviderAndAssertFilesWritten(to, rows, schema);
+    writePipeline.run().waitUntilFinish();
+    assertFolderContainsInAnyOrder(to, rows, schema);
+    readPipeline.run();
+  }
+
+  @Test
+  public void nullableAllPrimitiveDataTypes() {
+    String to = folder(SchemaAwareJavaBeans.NullableAllPrimitiveDataTypes.class);
+    Schema schema = NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.nullableAllPrimitiveDataTypesRows;
+    applyProviderAndAssertFilesWritten(to, rows, schema);
+    writePipeline.run().waitUntilFinish();
+    assertFolderContainsInAnyOrder(to, rows, schema);
+    readPipeline.run();
+  }
+
+  @Test
+  public void timeContaining() {
+    String to = folder(SchemaAwareJavaBeans.TimeContaining.class);
+    Schema schema = TIME_CONTAINING_SCHEMA;
+    List<Row> rows = DATA.timeContainingRows;
+    applyProviderAndAssertFilesWritten(to, rows, schema);
+    writePipeline.run().waitUntilFinish();
+    assertFolderContainsInAnyOrder(to, rows, schema);
+    readPipeline.run();
+  }
+
+  @Test
+  public void arrayPrimitiveDataTypes() {
+    String to = folder(SchemaAwareJavaBeans.ArrayPrimitiveDataTypes.class);
+    Schema schema = ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.arrayPrimitiveDataTypesRows;
+    applyProviderAndAssertFilesWritten(to, rows, schema);
+    writePipeline.run().waitUntilFinish();
+    assertFolderContainsInAnyOrder(to, rows, schema);
+    readPipeline.run();
+  }
+
+  @Test
+  public void singlyNestedDataTypesNoRepeat() {
+    String to = folder(SchemaAwareJavaBeans.SinglyNestedDataTypes.class, "no_repeat");
+    Schema schema = SINGLY_NESTED_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.singlyNestedDataTypesNoRepeatRows;
+    applyProviderAndAssertFilesWritten(to, rows, schema);
+    writePipeline.run().waitUntilFinish();
+    assertFolderContainsInAnyOrder(to, rows, schema);
+    readPipeline.run();
+  }
+
+  @Test
+  public void singlyNestedDataTypesRepeated() {
+    String to = folder(SchemaAwareJavaBeans.SinglyNestedDataTypes.class, "repeated");
+    Schema schema = SINGLY_NESTED_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.singlyNestedDataTypesNoRepeatRows;
+    applyProviderAndAssertFilesWritten(to, rows, schema);
+    writePipeline.run().waitUntilFinish();
+    assertFolderContainsInAnyOrder(to, rows, schema);
+    readPipeline.run();
+  }
+
+  @Test
+  public void doublyNestedDataTypesNoRepeat() {
+    String to = folder(SchemaAwareJavaBeans.DoublyNestedDataTypes.class, "no_repeat");
+    Schema schema = DOUBLY_NESTED_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.doublyNestedDataTypesNoRepeatRows;
+    applyProviderAndAssertFilesWritten(to, rows, schema);
+    writePipeline.run().waitUntilFinish();
+    assertFolderContainsInAnyOrder(to, rows, schema);
+    readPipeline.run();
+  }
+
+  @Test
+  public void doublyNestedDataTypesRepeat() {
+    String to = folder(SchemaAwareJavaBeans.DoublyNestedDataTypes.class, "repeated");
+    Schema schema = DOUBLY_NESTED_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.doublyNestedDataTypesRepeatRows;
+    applyProviderAndAssertFilesWritten(to, rows, schema);
+    writePipeline.run().waitUntilFinish();
+    assertFolderContainsInAnyOrder(to, rows, schema);
+    readPipeline.run();
+  }
+
+  private FileWriteSchemaTransformFormatProvider getProvider() {
+    return loadProviders().get(getFormat());
+  }
+
+  private <T> String folder(Class<T> clazz, String additionalPath) {
+    return folder(getFormat(), clazz.getSimpleName(), additionalPath);
+  }
+
+  private <T> String folder(Class<T> clazz) {
+    return folder(getFormat(), clazz.getSimpleName());
+  }
+
+  private String folder(String... paths) {
+    try {
+      return tmpFolder.newFolder(paths).getAbsolutePath() + getFilenamePrefix();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private PCollection<String> applyProviderAndAssertFilesWritten(
+      String folder, List<Row> rows, Schema schema) {
+    return applyProviderAndAssertFilesWritten(rows, schema, buildConfiguration(folder));
+  }
+
+  private PCollection<String> applyProviderAndAssertFilesWritten(
+      List<Row> rows, Schema schema, FileWriteSchemaTransformConfiguration configuration) {
+    PCollection<Row> input = writePipeline.apply(Create.of(rows).withRowSchema(schema));
+    PCollection<String> files = input.apply(getProvider().buildTransform(configuration, schema));
+    PCollection<Long> count = files.apply("count number of files", Count.globally());
+    PAssert.thatSingleton("At least one file should be written", count).notEqualTo(0L);
+    return files;
+  }
+
+  protected FileWriteSchemaTransformConfiguration defaultConfiguration(String folder) {
+    return FileWriteSchemaTransformConfiguration.builder()
+        .setFormat(getFormat())
+        .setFilenamePrefix(folder)
+        .build();
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java
new file mode 100644
index 00000000000..82ea6deeb71
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.AllPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ArrayPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.DoublyNestedDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NullableAllPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SinglyNestedDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+
+/** Shared {@link SchemaAwareJavaBeans} data to be used across various tests. */
+class FileWriteSchemaTransformFormatProviderTestData {
+  static final FileWriteSchemaTransformFormatProviderTestData DATA =
+      new FileWriteSchemaTransformFormatProviderTestData();
+
+  /* Prevent instantiation outside this class. */
+  private FileWriteSchemaTransformFormatProviderTestData() {}
+
+  final List<AllPrimitiveDataTypes> allPrimitiveDataTypesList =
+      Arrays.asList(
+          allPrimitiveDataTypes(
+              false, (byte) 1, BigDecimal.valueOf(1L), 1.2345, 1.2345f, (short) 1, 1, 1L, "a"),
+          allPrimitiveDataTypes(
+              true, (byte) 2, BigDecimal.valueOf(2L), 2.2345, 2.2345f, (short) 2, 2, 2L, "b"),
+          allPrimitiveDataTypes(
+              false, (byte) 3, BigDecimal.valueOf(3L), 3.2345, 3.2345f, (short) 3, 3, 3L, "c"));
+
+  final List<Row> allPrimitiveDataTypesRows =
+      allPrimitiveDataTypesList.stream()
+          .map(allPrimitiveDataTypesToRowFn()::apply)
+          .collect(Collectors.toList());
+
+  final List<NullableAllPrimitiveDataTypes> nullableAllPrimitiveDataTypesList =
+      Arrays.asList(
+          nullableAllPrimitiveDataTypes(null, null, null, null, null, null),
+          nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, 1, 1L, null),
+          nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, 1, null, "a"),
+          nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, null, 1L, "a"),
+          nullableAllPrimitiveDataTypes(false, 1.2345, null, 1, 1L, "a"),
+          nullableAllPrimitiveDataTypes(false, null, 1.2345f, 1, 1L, "a"),
+          nullableAllPrimitiveDataTypes(null, 1.2345, 1.2345f, 1, 1L, "a"),
+          nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, 1, 1L, "a"));
+
+  final List<Row> nullableAllPrimitiveDataTypesRows =
+      nullableAllPrimitiveDataTypesList.stream()
+          .map(nullableAllPrimitiveDataTypesToRowFn()::apply)
+          .collect(Collectors.toList());
+
+  final List<TimeContaining> timeContainingList =
+      Arrays.asList(
+          timeContaining(
+              Instant.ofEpochMilli(1L), Collections.singletonList(Instant.ofEpochMilli(2L))),
+          timeContaining(
+              Instant.ofEpochMilli(Long.MAX_VALUE - 2L),
+              Arrays.asList(Instant.ofEpochMilli(3L), Instant.ofEpochMilli(4L))));
+
+  final List<Row> timeContainingRows =
+      timeContainingList.stream().map(timeContainingToRowFn()::apply).collect(Collectors.toList());
+
+  final List<ArrayPrimitiveDataTypes> arrayPrimitiveDataTypesList =
+      Arrays.asList(
+          arrayPrimitiveDataTypes(
+              Collections.singletonList(false),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList()),
+          arrayPrimitiveDataTypes(
+              Collections.emptyList(),
+              Collections.singletonList(Double.MAX_VALUE),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList()),
+          arrayPrimitiveDataTypes(
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.singletonList(Float.MAX_VALUE),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList()),
+          arrayPrimitiveDataTypes(
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.singletonList(Short.MAX_VALUE),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList()),
+          arrayPrimitiveDataTypes(
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.singletonList(Integer.MAX_VALUE),
+              Collections.emptyList(),
+              Collections.emptyList()),
+          arrayPrimitiveDataTypes(
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.singletonList(Long.MAX_VALUE),
+              Collections.emptyList()),
+          arrayPrimitiveDataTypes(
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.singletonList(
+                  Stream.generate(() -> "🦄").limit(100).collect(Collectors.joining("")))),
+          arrayPrimitiveDataTypes(
+              Arrays.asList(false, true, false),
+              Arrays.asList(Double.MIN_VALUE, 0.0, Double.MAX_VALUE),
+              Arrays.asList(Float.MIN_VALUE, 0.0f, Float.MAX_VALUE),
+              Arrays.asList(Short.MIN_VALUE, (short) 0, Short.MAX_VALUE),
+              Arrays.asList(Integer.MIN_VALUE, 0, Integer.MAX_VALUE),
+              Arrays.asList(Long.MIN_VALUE, 0L, Long.MAX_VALUE),
+              Arrays.asList(
+                  Stream.generate(() -> "🐤").limit(100).collect(Collectors.joining("")),
+                  Stream.generate(() -> "🐥").limit(100).collect(Collectors.joining("")),
+                  Stream.generate(() -> "🐣").limit(100).collect(Collectors.joining("")))),
+          arrayPrimitiveDataTypes(
+              Stream.generate(() -> true).limit(Short.MAX_VALUE).collect(Collectors.toList()),
+              Stream.generate(() -> Double.MIN_VALUE).limit(100).collect(Collectors.toList()),
+              Stream.generate(() -> Float.MIN_VALUE).limit(100).collect(Collectors.toList()),
+              Stream.generate(() -> Short.MIN_VALUE).limit(100).collect(Collectors.toList()),
+              Stream.generate(() -> Integer.MIN_VALUE).limit(100).collect(Collectors.toList()),
+              Stream.generate(() -> Long.MIN_VALUE).limit(100).collect(Collectors.toList()),
+              Stream.generate(() -> "🐿").limit(100).collect(Collectors.toList())),
+          arrayPrimitiveDataTypes(
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList(),
+              Collections.emptyList()));
+
+  final List<Row> arrayPrimitiveDataTypesRows =
+      arrayPrimitiveDataTypesList.stream()
+          .map(arrayPrimitiveDataTypesToRowFn()::apply)
+          .collect(Collectors.toList());
+
+  final List<SinglyNestedDataTypes> singlyNestedDataTypesNoRepeat =
+      allPrimitiveDataTypesList.stream()
+          .map(SchemaAwareJavaBeans::singlyNestedDataTypes)
+          .collect(Collectors.toList());
+
+  final List<Row> singlyNestedDataTypesNoRepeatRows =
+      singlyNestedDataTypesNoRepeat.stream()
+          .map(singlyNestedDataTypesToRowFn()::apply)
+          .collect(Collectors.toList());
+
+  final List<SinglyNestedDataTypes> singlyNestedDataTypesRepeated =
+      allPrimitiveDataTypesList.stream()
+          .map(
+              (AllPrimitiveDataTypes element) ->
+                  singlyNestedDataTypes(element, element, element, element))
+          .collect(Collectors.toList());
+
+  final List<Row> singlyNestedDataTypesRepeatedRows =
+      singlyNestedDataTypesRepeated.stream()
+          .map(singlyNestedDataTypesToRowFn()::apply)
+          .collect(Collectors.toList());
+
+  final List<DoublyNestedDataTypes> doublyNestedDataTypesNoRepeat =
+      singlyNestedDataTypesNoRepeat.stream()
+          .map(SchemaAwareJavaBeans::doublyNestedDataTypes)
+          .collect(Collectors.toList());
+
+  final List<Row> doublyNestedDataTypesNoRepeatRows =
+      doublyNestedDataTypesNoRepeat.stream()
+          .map(doublyNestedDataTypesToRowFn()::apply)
+          .collect(Collectors.toList());
+
+  final List<DoublyNestedDataTypes> doublyNestedDataTypesRepeat =
+      singlyNestedDataTypesRepeated.stream()
+          .map((SinglyNestedDataTypes element) -> doublyNestedDataTypes(element, element, element))
+          .collect(Collectors.toList());
+
+  final List<Row> doublyNestedDataTypesRepeatRows =
+      doublyNestedDataTypesRepeat.stream()
+          .map(doublyNestedDataTypesToRowFn()::apply)
+          .collect(Collectors.toList());
+}
diff --git a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
similarity index 64%
copy from sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
copy to sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
index 6081e783ec2..b145243fb14 100644
--- a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
@@ -17,10 +17,16 @@
  */
 package org.apache.beam.sdk.io.fileschematransform;
 
-import static org.junit.Assert.assertTrue;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.AVRO;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.CSV;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.JSON;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.PARQUET;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+import static org.junit.Assert.assertEquals;
 
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -28,16 +34,11 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link FileWriteSchemaTransformFormatProviders}. */
 @RunWith(JUnit4.class)
 public class FileWriteSchemaTransformFormatProvidersTest {
-
   @Test
-  public void testLoadProviders() {
+  public void loadProviders() {
     Map<String, FileWriteSchemaTransformFormatProvider> formatProviderMap =
         FileWriteSchemaTransformFormatProviders.loadProviders();
     Set<String> keys = formatProviderMap.keySet();
-    assertTrue(keys.contains("avro"));
-    assertTrue(keys.contains("csv"));
-    assertTrue(keys.contains("json"));
-    assertTrue(keys.contains("parquet"));
-    assertTrue(keys.contains("xml"));
+    assertEquals(ImmutableSet.of(AVRO, CSV, JSON, PARQUET, XML), keys);
   }
 }
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..e1cc231f934
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.AVRO;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.JSON;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.PARQUET;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider.INPUT_TAG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider.FileWriteSchemaTransform;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FileWriteSchemaTransformProvider}. */
+@RunWith(JUnit4.class)
+public class FileWriteSchemaTransformProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<FileWriteSchemaTransformConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(FileWriteSchemaTransformConfiguration.class);
+  private static final SerializableFunction<FileWriteSchemaTransformConfiguration, Row> TO_ROW_FN =
+      AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+  private static final FileWriteSchemaTransformProvider PROVIDER =
+      new FileWriteSchemaTransformProvider();
+
+  @Rule
+  public TestPipeline errorPipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Test
+  public void receivedUnexpectedInputTagsThrowsAnError() {
+    SchemaTransform transform =
+        PROVIDER.from(rowConfiguration(defaultConfiguration().setFormat(JSON).build()));
+    PCollectionRowTuple empty = PCollectionRowTuple.empty(errorPipeline);
+    IllegalArgumentException emptyInputError =
+        assertThrows(IllegalArgumentException.class, () -> empty.apply(transform.buildTransform()));
+
+    assertEquals(
+        "org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider$FileWriteSchemaTransform expects a single input tagged PCollection<Row> input",
+        emptyInputError.getMessage());
+
+    PCollection<Row> rows1 =
+        errorPipeline.apply(
+            Create.of(DATA.allPrimitiveDataTypesRows)
+                .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+    PCollection<Row> rows2 =
+        errorPipeline.apply(
+            Create.of(DATA.timeContainingRows).withRowSchema(TIME_CONTAINING_SCHEMA));
+
+    PCollectionRowTuple tooManyTags =
+        PCollectionRowTuple.of(INPUT_TAG, rows1).and("another", rows2);
+    IllegalArgumentException tooManyTagsError =
+        assertThrows(
+            IllegalArgumentException.class, () -> tooManyTags.apply(transform.buildTransform()));
+
+    assertEquals(
+        "org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider$FileWriteSchemaTransform expects a single input tagged PCollection<Row> input",
+        tooManyTagsError.getMessage());
+
+    // should not throw an error
+    PCollectionRowTuple input = PCollectionRowTuple.of(INPUT_TAG, rows1);
+    input.apply(transform.buildTransform());
+  }
+
+  @Test
+  public void formatMapsToFileWriteSchemaFormatTransform() {
+    Row avro = rowConfiguration(defaultConfiguration().setFormat(AVRO).build());
+    FileWriteSchemaTransformFormatProvider avroFormatProvider =
+        ((FileWriteSchemaTransform) PROVIDER.from(avro)).getProvider();
+    assertTrue(avroFormatProvider instanceof AvroWriteSchemaTransformFormatProvider);
+
+    Row json = rowConfiguration(defaultConfiguration().setFormat(JSON).build());
+    FileWriteSchemaTransformFormatProvider jsonFormatProvider =
+        ((FileWriteSchemaTransform) PROVIDER.from(json)).getProvider();
+    assertTrue(jsonFormatProvider instanceof JsonWriteSchemaTransformFormatProvider);
+
+    Row parquet = rowConfiguration(defaultConfiguration().setFormat(PARQUET).build());
+    FileWriteSchemaTransformFormatProvider parquetFormatProvider =
+        ((FileWriteSchemaTransform) PROVIDER.from(parquet)).getProvider();
+    assertTrue(parquetFormatProvider instanceof ParquetWriteSchemaTransformFormatProvider);
+
+    Row xml = rowConfiguration(defaultConfiguration().setFormat(XML).build());
+    FileWriteSchemaTransformFormatProvider xmlFormatProvider =
+        ((FileWriteSchemaTransform) PROVIDER.from(xml)).getProvider();
+    assertTrue(xmlFormatProvider instanceof XmlWriteSchemaTransformFormatProvider);
+  }
+
+  private static Row rowConfiguration(FileWriteSchemaTransformConfiguration configuration) {
+    return TO_ROW_FN.apply(configuration);
+  }
+
+  private static FileWriteSchemaTransformConfiguration.Builder defaultConfiguration() {
+    return FileWriteSchemaTransformConfiguration.builder()
+        .setFilenamePrefix(FileWriteSchemaTransformProviderTest.class.getSimpleName());
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonFileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonFileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..e380040565b
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonFileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.JSON;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link JsonWriteSchemaTransformFormatProvider}. */
+@RunWith(JUnit4.class)
+public class JsonFileWriteSchemaTransformFormatProviderTest
+    extends FileWriteSchemaTransformFormatProviderTest {
+  @Override
+  protected String getFormat() {
+    return JSON;
+  }
+
+  @Override
+  protected String getFilenamePrefix() {
+    return "/out";
+  }
+
+  @Override
+  protected void assertFolderContainsInAnyOrder(String folder, List<Row> rows, Schema beamSchema) {
+    PCollection<String> actual = readPipeline.apply(TextIO.read().from(folder + "*"));
+
+    PayloadSerializer payloadSerializer =
+        new JsonPayloadSerializerProvider().getSerializer(beamSchema, ImmutableMap.of());
+
+    PAssert.that(actual)
+        .containsInAnyOrder(
+            rows.stream()
+                .map(
+                    (Row row) ->
+                        new String(payloadSerializer.serialize(row), StandardCharsets.UTF_8))
+                .collect(Collectors.toList()));
+  }
+
+  @Override
+  protected FileWriteSchemaTransformConfiguration buildConfiguration(String folder) {
+    return defaultConfiguration(folder);
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenCompressionSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenParquetConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$ParquetConfiguration is not compatible with a json format");
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenXmlConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$XmlConfiguration is not compatible with a json format");
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenNumShardsSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenShardNameTemplateSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenCsvConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$CsvConfiguration is not compatible with a json format");
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetFileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetFileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..ea51f235c7f
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetFileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.parquetConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.PARQUET;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ParquetWriteSchemaTransformFormatProvider}. */
+@RunWith(JUnit4.class)
+public class ParquetFileWriteSchemaTransformFormatProviderTest
+    extends FileWriteSchemaTransformFormatProviderTest {
+  @Override
+  protected String getFormat() {
+    return PARQUET;
+  }
+
+  @Override
+  protected String getFilenamePrefix() {
+    return "";
+  }
+
+  @Override
+  protected void assertFolderContainsInAnyOrder(String folder, List<Row> rows, Schema beamSchema) {
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+
+    List<GenericRecord> expected =
+        rows.stream()
+            .map(AvroUtils.getRowToGenericRecordFunction(avroSchema)::apply)
+            .collect(Collectors.toList());
+
+    PCollection<GenericRecord> actual =
+        readPipeline.apply(
+            ParquetIO.read(avroSchema)
+                .from(folder + "/" + getFilenamePrefix() + "*")
+                .withProjection(avroSchema, avroSchema));
+
+    PAssert.that(actual).containsInAnyOrder(expected);
+  }
+
+  @Override
+  protected FileWriteSchemaTransformConfiguration buildConfiguration(String folder) {
+    return FileWriteSchemaTransformConfiguration.builder()
+        .setParquetConfiguration(
+            parquetConfigurationBuilder()
+                .setCompressionCodecName(CompressionCodecName.GZIP.name())
+                .build())
+        .setFormat(getFormat())
+        .setFilenamePrefix(folder + getFilenamePrefix())
+        .build();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenCompressionSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenParquetConfigurationSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenXmlConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$XmlConfiguration is not compatible with a parquet format");
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenNumShardsSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenShardNameTemplateSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenCsvConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$CsvConfiguration is not compatible with a parquet format");
+  }
+}
diff --git a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapterTest.java
similarity index 56%
rename from sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
rename to sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapterTest.java
index 6081e783ec2..2a5584b89bc 100644
--- a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapterTest.java
@@ -17,27 +17,31 @@
  */
 package org.apache.beam.sdk.io.fileschematransform;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
-import java.util.Map;
-import java.util.Set;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Tests for {@link FileWriteSchemaTransformFormatProviders}. */
+/** Tests for {@link XmlDateTimeAdapter}. */
 @RunWith(JUnit4.class)
-public class FileWriteSchemaTransformFormatProvidersTest {
+public class XmlDateTimeAdapterTest {
 
   @Test
-  public void testLoadProviders() {
-    Map<String, FileWriteSchemaTransformFormatProvider> formatProviderMap =
-        FileWriteSchemaTransformFormatProviders.loadProviders();
-    Set<String> keys = formatProviderMap.keySet();
-    assertTrue(keys.contains("avro"));
-    assertTrue(keys.contains("csv"));
-    assertTrue(keys.contains("json"));
-    assertTrue(keys.contains("parquet"));
-    assertTrue(keys.contains("xml"));
+  public void unmarshal() throws Exception {
+    XmlDateTimeAdapter adapter = new XmlDateTimeAdapter();
+    String dateTimeInput = "2022-12-29T21:04:51.171Z";
+    assertEquals(
+        Instant.ofEpochMilli(1672347891171L).toDateTime(), adapter.unmarshal(dateTimeInput));
+  }
+
+  @Test
+  public void marshal() throws Exception {
+    XmlDateTimeAdapter adapter = new XmlDateTimeAdapter();
+    DateTime dateTime = Instant.ofEpochMilli(1672347891171L).toDateTime(DateTimeZone.UTC);
+    assertEquals("2022-12-29T21:04:51.171Z", adapter.marshal(dateTime));
   }
 }
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlFileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlFileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..87d59364702
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlFileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.xml.XmlIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link XmlWriteSchemaTransformFormatProvider}. */
+@RunWith(JUnit4.class)
+public class XmlFileWriteSchemaTransformFormatProviderTest
+    extends FileWriteSchemaTransformFormatProviderTest {
+
+  private static final String ROOT_ELEMENT = "rootElement";
+  private static final String RECORD_ELEMENT = "row";
+
+  @Override
+  protected String getFormat() {
+    return XML;
+  }
+
+  @Override
+  protected String getFilenamePrefix() {
+    return "";
+  }
+
+  @Override
+  protected void assertFolderContainsInAnyOrder(String folder, List<Row> rows, Schema beamSchema) {
+    List<XmlRowAdapter> expected =
+        rows.stream()
+            .map(
+                (Row row) -> {
+                  XmlRowAdapter result = new XmlRowAdapter();
+                  result.wrapRow(row);
+                  return result;
+                })
+            .collect(Collectors.toList());
+
+    PCollection<XmlRowAdapter> actual =
+        readPipeline.apply(
+            XmlIO.<XmlRowAdapter>read()
+                .from(folder + "/*")
+                .withRecordClass(XmlRowAdapter.class)
+                .withRootElement(ROOT_ELEMENT)
+                .withRecordElement(RECORD_ELEMENT)
+                .withCharset(Charset.defaultCharset()));
+
+    PAssert.that(actual).containsInAnyOrder(expected);
+  }
+
+  @Override
+  protected FileWriteSchemaTransformConfiguration buildConfiguration(String folder) {
+    return FileWriteSchemaTransformConfiguration.builder()
+        .setFormat(XML)
+        .setXmlConfiguration(
+            FileWriteSchemaTransformConfiguration.xmlConfigurationBuilder()
+                .setRootElement(ROOT_ELEMENT)
+                .build())
+        .setFilenamePrefix(folder)
+        .setNumShards(1)
+        .build();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenCompressionSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenParquetConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$ParquetConfiguration is not compatible with a xml format");
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenXmlConfigurationSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenNumShardsSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenShardNameTemplateSet() {
+    return Optional.empty();
+  }
+
+  @Override
+  protected Optional<String> expectedErrorWhenCsvConfigurationSet() {
+    return Optional.of(
+        "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$CsvConfiguration is not compatible with a xml format");
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapterTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapterTest.java
new file mode 100644
index 00000000000..a23476e0adb
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+/** Tests for {@link XmlRowAdapter}. */
+@RunWith(JUnit4.class)
+public class XmlRowAdapterTest {
+
+  @Test
+  public void allPrimitiveDataTypes()
+      throws XPathExpressionException, JAXBException, IOException, SAXException,
+          ParserConfigurationException {
+
+    for (Row row : DATA.allPrimitiveDataTypesRows) {
+      NodeList entries = xmlDocumentEntries(row);
+      assertEquals(ALL_PRIMITIVE_DATA_TYPES_SCHEMA.getFieldNames().size(), entries.getLength());
+      Map<String, Node> actualMap = keyValues("allPrimitiveDataTypes", entries);
+      assertEquals(
+          new HashSet<>(ALL_PRIMITIVE_DATA_TYPES_SCHEMA.getFieldNames()), actualMap.keySet());
+      for (Entry<String, Node> actualKV : actualMap.entrySet()) {
+        String key = actualKV.getKey();
+        Node node = actualKV.getValue();
+        String actual = node.getTextContent();
+        Optional<Object> safeExpected = Optional.ofNullable(row.getValue(key));
+        assertTrue(safeExpected.isPresent());
+        String expected = safeExpected.get().toString();
+        assertEquals(expected, actual);
+      }
+    }
+  }
+
+  @Test
+  public void nullableAllPrimitiveDataTypes()
+      throws XPathExpressionException, JAXBException, IOException, SAXException,
+          ParserConfigurationException {
+    for (Row row : DATA.nullableAllPrimitiveDataTypesRows) {
+      NodeList entries = xmlDocumentEntries(row);
+      assertEquals(
+          NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA.getFieldNames().size(), entries.getLength());
+      Map<String, Node> actualMap = keyValues("nullableAllPrimitiveDataTypes", entries);
+      assertEquals(
+          new HashSet<>(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA.getFieldNames()),
+          actualMap.keySet());
+      for (Entry<String, Node> actualKV : actualMap.entrySet()) {
+        String key = actualKV.getKey();
+        Node node = actualKV.getValue();
+        String actual = node.getTextContent();
+        String expected = "";
+        Optional<Object> safeExpected = Optional.ofNullable(row.getValue(key));
+        if (safeExpected.isPresent()) {
+          expected = safeExpected.get().toString();
+        }
+        assertEquals(expected, actual);
+      }
+    }
+  }
+
+  @Test
+  public void timeContaining()
+      throws XPathExpressionException, JAXBException, IOException, ParserConfigurationException,
+          SAXException {
+    String instant = "instant";
+    DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
+    String instantList = "instantList";
+    for (Row row : DATA.timeContainingRows) {
+      Optional<TimeContaining> safeExpectedTimeContaining =
+          Optional.ofNullable(timeContainingFromRowFn().apply(row));
+      assertTrue(safeExpectedTimeContaining.isPresent());
+      TimeContaining expectedTimeContaining = safeExpectedTimeContaining.get();
+      NodeList entries = xmlDocumentEntries(row);
+      assertEquals(TIME_CONTAINING_SCHEMA.getFieldNames().size(), entries.getLength());
+      Map<String, Node> actualMap = keyValues("timeContaining", entries);
+      assertEquals(new HashSet<>(TIME_CONTAINING_SCHEMA.getFieldNames()), actualMap.keySet());
+      Node actualInstantNode = actualMap.get(instant);
+      String actual = actualInstantNode.getTextContent();
+      String expected = formatter.print(expectedTimeContaining.getInstant().toDateTime());
+      assertEquals(expected, actual);
+
+      List<String> actualInstantList =
+          toStringList(actualMap.get(instantList).getChildNodes()).stream()
+              .sorted()
+              .collect(Collectors.toList());
+      List<String> expectedInstantList =
+          expectedTimeContaining.getInstantList().stream()
+              .map(Instant::toDateTime)
+              .map(formatter::print)
+              .sorted()
+              .collect(Collectors.toList());
+      assertEquals(expectedInstantList, actualInstantList);
+    }
+  }
+
+  private static List<String> toStringList(NodeList nodes) {
+    List<String> result = new ArrayList<>();
+    for (int i = 0; i < nodes.getLength(); i++) {
+      Node node = nodes.item(i);
+      result.add(node.getTextContent());
+    }
+    return result;
+  }
+
+  private static Map<String, Node> keyValues(String testName, NodeList entries) {
+    Map<String, Node> result = new HashMap<>();
+    for (int i = 0; i < entries.getLength(); i++) {
+      NodeList kv = entries.item(i).getChildNodes();
+      assertEquals(testName, 2, kv.getLength());
+      String key = kv.item(0).getTextContent();
+      Node value = kv.item(1);
+      result.put(key, value);
+    }
+    return result;
+  }
+
+  private NodeList xmlDocumentEntries(Row row)
+      throws JAXBException, IOException, SAXException, XPathExpressionException,
+          ParserConfigurationException {
+    JAXBContext context = JAXBContext.newInstance(XmlRowAdapter.class);
+    Marshaller marshaller = context.createMarshaller();
+    DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
+    DocumentBuilder documentBuilder = builderFactory.newDocumentBuilder();
+    XPath entryPath = XPathFactory.newInstance().newXPath();
+    XPathExpression entryPathExpression = entryPath.compile("/row/data/entry");
+    XmlRowAdapter adapter = new XmlRowAdapter();
+    adapter.wrapRow(row);
+    StringWriter writer = new StringWriter();
+    marshaller.marshal(adapter, writer);
+    String content = writer.toString();
+    Document xmlDocument = documentBuilder.parse(new InputSource(new StringReader(content)));
+    return (NodeList) entryPathExpression.evaluate(xmlDocument, XPathConstants.NODESET);
+  }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValueTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValueTest.java
new file mode 100644
index 00000000000..fbe88359f95
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValueTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+import org.junit.Test;
+
+/** Tests for {@link XmlRowValue}. */
+public class XmlRowValueTest {
+
+  @Test
+  public void allPrimitiveDataTypes() {
+    String aBoolean = "aBoolean";
+    String aByte = "aByte";
+    String aDecimal = "aDecimal";
+    String aDouble = "aDouble";
+    String aFloat = "aFloat";
+    String aShort = "aShort";
+    String anInteger = "anInteger";
+    String aLong = "aLong";
+    String aString = "aString";
+
+    for (Row row : DATA.allPrimitiveDataTypesRows) {
+      XmlRowValue aBooleanValue = new XmlRowValue();
+      aBooleanValue.setValue(aBoolean, row);
+      assertEquals(aBoolean, row.getValue(aBoolean), aBooleanValue.getPrimitiveValue());
+
+      XmlRowValue aByteValue = new XmlRowValue();
+      aByteValue.setValue(aByte, row);
+      assertEquals(aByte, row.getValue(aByte), aByteValue.getPrimitiveValue());
+
+      XmlRowValue aDecimalValue = new XmlRowValue();
+      aDecimalValue.setValue(aDecimal, row);
+      assertEquals(aDecimal, row.getValue(aDecimal), aDecimalValue.getPrimitiveValue());
+
+      XmlRowValue aDoubleValue = new XmlRowValue();
+      aDoubleValue.setValue(aDouble, row);
+      assertEquals(aDouble, row.getValue(aDouble), aDoubleValue.getPrimitiveValue());
+
+      XmlRowValue aFloatValue = new XmlRowValue();
+      aFloatValue.setValue(aFloat, row);
+      assertEquals(aFloat, row.getValue(aFloat), aFloatValue.getPrimitiveValue());
+
+      XmlRowValue aShortValue = new XmlRowValue();
+      aShortValue.setValue(aShort, row);
+      assertEquals(aShort, row.getValue(aShort), aShortValue.getPrimitiveValue());
+
+      XmlRowValue anIntegerValue = new XmlRowValue();
+      anIntegerValue.setValue(anInteger, row);
+      assertEquals(anInteger, row.getValue(anInteger), anIntegerValue.getPrimitiveValue());
+
+      XmlRowValue aLongValue = new XmlRowValue();
+      aLongValue.setValue(aLong, row);
+      assertEquals(aLong, row.getValue(aLong), aLongValue.getPrimitiveValue());
+
+      XmlRowValue aStringValue = new XmlRowValue();
+      aStringValue.setValue(aString, row);
+      assertEquals(aString, row.getValue(aString), aStringValue.getPrimitiveValue());
+    }
+  }
+
+  @Test
+  public void nullableAllPrimitiveDataTypes() {
+    String aBoolean = "aBoolean";
+    String aDouble = "aDouble";
+    String aFloat = "aFloat";
+    String anInteger = "anInteger";
+    String aLong = "aLong";
+    String aString = "aString";
+    for (Row row : DATA.nullableAllPrimitiveDataTypesRows) {
+      XmlRowValue aBooleanValue = new XmlRowValue();
+      aBooleanValue.setValue(aBoolean, row);
+      assertEquals(aBoolean, row.getValue(aBoolean), aBooleanValue.getPrimitiveValue());
+
+      XmlRowValue aDoubleValue = new XmlRowValue();
+      aDoubleValue.setValue(aDouble, row);
+      assertEquals(aDouble, row.getValue(aDouble), aDoubleValue.getPrimitiveValue());
+
+      XmlRowValue aFloatValue = new XmlRowValue();
+      aFloatValue.setValue(aFloat, row);
+      assertEquals(aFloat, row.getValue(aFloat), aFloatValue.getPrimitiveValue());
+
+      XmlRowValue anIntegerValue = new XmlRowValue();
+      anIntegerValue.setValue(anInteger, row);
+      assertEquals(anInteger, row.getValue(anInteger), anIntegerValue.getPrimitiveValue());
+
+      XmlRowValue aLongValue = new XmlRowValue();
+      aLongValue.setValue(aLong, row);
+      assertEquals(aLong, row.getValue(aLong), aLongValue.getPrimitiveValue());
+
+      XmlRowValue aStringValue = new XmlRowValue();
+      aStringValue.setValue(aString, row);
+      assertEquals(aString, row.getValue(aString), aStringValue.getPrimitiveValue());
+    }
+  }
+
+  @Test
+  public void arrayPrimitiveDataTypes() {
+    String booleans = "booleanList";
+    String doubles = "doubleList";
+    String floats = "floatList";
+    String shorts = "shortList";
+    String integers = "integerList";
+    String longs = "longList";
+    String strings = "stringList";
+
+    for (Row row : DATA.arrayPrimitiveDataTypesRows) {
+      XmlRowValue booleansValue = new XmlRowValue();
+      booleansValue.setValue(booleans, row);
+      Optional<ArrayList<XmlRowValue>> booleansValueList =
+          Optional.ofNullable(booleansValue.getValueList());
+      assertTrue(booleans, booleansValueList.isPresent());
+      assertEquals(
+          booleans,
+          row.getArray(booleans),
+          booleansValueList.get().stream()
+              .map(XmlRowValue::getPrimitiveValue)
+              .collect(Collectors.toList()));
+
+      XmlRowValue doublesValue = new XmlRowValue();
+      doublesValue.setValue(doubles, row);
+      Optional<ArrayList<XmlRowValue>> doublesValueList =
+          Optional.ofNullable(doublesValue.getValueList());
+      assertTrue(doubles, doublesValueList.isPresent());
+      assertEquals(
+          doubles,
+          row.getArray(doubles),
+          doublesValueList.get().stream()
+              .map(XmlRowValue::getPrimitiveValue)
+              .collect(Collectors.toList()));
+
+      XmlRowValue floatsValue = new XmlRowValue();
+      floatsValue.setValue(floats, row);
+      Optional<ArrayList<XmlRowValue>> floatsValueList =
+          Optional.ofNullable(floatsValue.getValueList());
+      assertTrue(floats, floatsValueList.isPresent());
+      assertEquals(
+          floats,
+          row.getArray(floats),
+          floatsValueList.get().stream()
+              .map(XmlRowValue::getPrimitiveValue)
+              .collect(Collectors.toList()));
+
+      XmlRowValue shortsValue = new XmlRowValue();
+      shortsValue.setValue(shorts, row);
+      Optional<ArrayList<XmlRowValue>> shortsValueList =
+          Optional.ofNullable(shortsValue.getValueList());
+      assertTrue(shorts, shortsValueList.isPresent());
+      assertEquals(
+          shorts,
+          row.getArray(shorts),
+          shortsValueList.get().stream()
+              .map(XmlRowValue::getPrimitiveValue)
+              .collect(Collectors.toList()));
+
+      XmlRowValue integersValue = new XmlRowValue();
+      integersValue.setValue(integers, row);
+      Optional<ArrayList<XmlRowValue>> integersValueList =
+          Optional.ofNullable(integersValue.getValueList());
+      assertTrue(integers, integersValueList.isPresent());
+      assertEquals(
+          integers,
+          row.getArray(integers),
+          integersValueList.get().stream()
+              .map(XmlRowValue::getPrimitiveValue)
+              .collect(Collectors.toList()));
+
+      XmlRowValue longsValue = new XmlRowValue();
+      longsValue.setValue(longs, row);
+      Optional<ArrayList<XmlRowValue>> longsValueList =
+          Optional.ofNullable(longsValue.getValueList());
+      assertTrue(longs, longsValueList.isPresent());
+      assertEquals(
+          longs,
+          row.getArray(longs),
+          longsValueList.get().stream()
+              .map(XmlRowValue::getPrimitiveValue)
+              .collect(Collectors.toList()));
+
+      XmlRowValue stringsValue = new XmlRowValue();
+      stringsValue.setValue(strings, row);
+      Optional<ArrayList<XmlRowValue>> stringsValueList =
+          Optional.ofNullable(stringsValue.getValueList());
+      assertTrue(strings, stringsValueList.isPresent());
+      assertEquals(
+          strings,
+          row.getArray(strings),
+          stringsValueList.get().stream()
+              .map(XmlRowValue::getPrimitiveValue)
+              .collect(Collectors.toList()));
+    }
+  }
+
+  @Test
+  public void timeContaining() {
+    String instant = "instant";
+    String instantList = "instantList";
+    for (Row row : DATA.timeContainingRows) {
+      XmlRowValue instantValue = new XmlRowValue();
+      instantValue.setValue(instant, row);
+      Optional<ReadableDateTime> expected = Optional.ofNullable(row.getDateTime(instant));
+      Optional<DateTime> actual = Optional.ofNullable(instantValue.getDateTimeValue());
+      assertTrue(instant, expected.isPresent());
+      assertTrue(instant, actual.isPresent());
+      assertEquals(instant, expected.get().getMillis(), actual.get().getMillis());
+
+      XmlRowValue instantListValue = new XmlRowValue();
+      instantListValue.setValue(instantList, row);
+      Optional<Collection<Instant>> expectedList = Optional.ofNullable(row.getArray(instantList));
+      Optional<List<XmlRowValue>> actualList = Optional.ofNullable(instantListValue.getValueList());
+      assertTrue(instantList, expectedList.isPresent());
+      assertTrue(instantList, actualList.isPresent());
+      assertFalse(instantList, expectedList.get().isEmpty());
+      assertFalse(instantList, actualList.get().isEmpty());
+
+      assertEquals(
+          instantList,
+          expectedList.get().stream().map(Instant::getMillis).collect(Collectors.toList()),
+          dateTimes(actualList.get()).stream()
+              .map(DateTime::getMillis)
+              .collect(Collectors.toList()));
+    }
+  }
+
+  @Test
+  public void singlyNestedDataTypesNoRepeat() {
+    String allPrimitiveDataTypes = "allPrimitiveDataTypes";
+    String allPrimitiveDataTypesList = "allPrimitiveDataTypesList";
+    for (Row row : DATA.singlyNestedDataTypesNoRepeatRows) {
+      XmlRowValue allPrimitiveDataTypesValue = new XmlRowValue();
+      allPrimitiveDataTypesValue.setValue(allPrimitiveDataTypes, row);
+      Optional<Row> expectedAllPrimitiveDataTypes =
+          Optional.ofNullable(row.getRow(allPrimitiveDataTypes));
+      Optional<Map<String, XmlRowValue>> actualAllPrimitiveDataTypes =
+          Optional.ofNullable(allPrimitiveDataTypesValue.getNestedValue());
+      assertTrue(allPrimitiveDataTypes, expectedAllPrimitiveDataTypes.isPresent());
+      assertTrue(allPrimitiveDataTypes, actualAllPrimitiveDataTypes.isPresent());
+      assertEquals(
+          allPrimitiveDataTypes,
+          values(expectedAllPrimitiveDataTypes.get()),
+          values(actualAllPrimitiveDataTypes.get()));
+
+      XmlRowValue allPrimitiveDataTypesValueList = new XmlRowValue();
+      allPrimitiveDataTypesValueList.setValue(allPrimitiveDataTypesList, row);
+      Optional<Collection<Row>> expectedAllPrimitiveDataTypesList =
+          Optional.ofNullable(row.getArray(allPrimitiveDataTypesList));
+      Optional<ArrayList<XmlRowValue>> actualAllPrimitiveDataTypesList =
+          Optional.ofNullable(allPrimitiveDataTypesValueList.getValueList());
+
+      assertTrue(allPrimitiveDataTypesList, expectedAllPrimitiveDataTypesList.isPresent());
+      assertTrue(allPrimitiveDataTypesList, actualAllPrimitiveDataTypesList.isPresent());
+      assertTrue(allPrimitiveDataTypesList, expectedAllPrimitiveDataTypesList.get().isEmpty());
+      assertTrue(allPrimitiveDataTypesList, actualAllPrimitiveDataTypesList.get().isEmpty());
+    }
+  }
+
+  @Test
+  public void singlyNestedDataTypesRepeat() {
+    String allPrimitiveDataTypes = "allPrimitiveDataTypes";
+    String allPrimitiveDataTypesList = "allPrimitiveDataTypesList";
+    for (Row row : DATA.singlyNestedDataTypesRepeatedRows) {
+      XmlRowValue allPrimitiveDataTypesValue = new XmlRowValue();
+      allPrimitiveDataTypesValue.setValue(allPrimitiveDataTypes, row);
+      Optional<Row> expectedAllPrimitiveDataTypes =
+          Optional.ofNullable(row.getRow(allPrimitiveDataTypes));
+      Optional<Map<String, XmlRowValue>> actualAllPrimitiveDataTypes =
+          Optional.ofNullable(allPrimitiveDataTypesValue.getNestedValue());
+      assertTrue(allPrimitiveDataTypes, expectedAllPrimitiveDataTypes.isPresent());
+      assertTrue(allPrimitiveDataTypes, actualAllPrimitiveDataTypes.isPresent());
+      assertEquals(
+          allPrimitiveDataTypes,
+          values(expectedAllPrimitiveDataTypes.get()),
+          values(actualAllPrimitiveDataTypes.get()));
+
+      XmlRowValue allPrimitiveDataTypesValueList = new XmlRowValue();
+      allPrimitiveDataTypesValueList.setValue(allPrimitiveDataTypesList, row);
+      Optional<Collection<Row>> expectedAllPrimitiveDataTypesList =
+          Optional.ofNullable(row.getArray(allPrimitiveDataTypesList));
+      Optional<List<XmlRowValue>> actualAllPrimitiveDataTypesList =
+          Optional.ofNullable(allPrimitiveDataTypesValueList.getValueList());
+
+      assertTrue(allPrimitiveDataTypesList, expectedAllPrimitiveDataTypesList.isPresent());
+      assertTrue(allPrimitiveDataTypesList, actualAllPrimitiveDataTypesList.isPresent());
+      assertFalse(allPrimitiveDataTypesList, expectedAllPrimitiveDataTypesList.get().isEmpty());
+      assertFalse(allPrimitiveDataTypesList, actualAllPrimitiveDataTypesList.get().isEmpty());
+      assertEquals(
+          allPrimitiveDataTypesList,
+          valuesList(expectedAllPrimitiveDataTypesList.get()),
+          valuesList(actualAllPrimitiveDataTypesList.get()));
+    }
+  }
+
+  @Test
+  public void doublyNestedDataTypesNoRepeat() {
+    String singlyNestedDataTypes = "singlyNestedDataTypes";
+    String allPrimitiveDataTypes = "allPrimitiveDataTypes";
+    String allPrimitiveDataTypesList = "allPrimitiveDataTypesList";
+    for (Row row : DATA.doublyNestedDataTypesNoRepeatRows) {
+      XmlRowValue singlyNestedDataTypesValue = new XmlRowValue();
+      singlyNestedDataTypesValue.setValue(singlyNestedDataTypes, row);
+      Optional<Row> expectedSinglyNestedDataTypesValue =
+          Optional.ofNullable(row.getRow(singlyNestedDataTypes));
+      Optional<Map<String, XmlRowValue>> actualSinglyNestedDataTypesValue =
+          Optional.ofNullable(singlyNestedDataTypesValue.getNestedValue());
+      assertTrue(singlyNestedDataTypes, expectedSinglyNestedDataTypesValue.isPresent());
+      assertTrue(singlyNestedDataTypes, actualSinglyNestedDataTypesValue.isPresent());
+      assertNotNull(
+          singlyNestedDataTypes,
+          expectedSinglyNestedDataTypesValue.get().getValue(allPrimitiveDataTypes));
+      assertNotNull(
+          singlyNestedDataTypes, actualSinglyNestedDataTypesValue.get().get(allPrimitiveDataTypes));
+      assertNotNull(
+          singlyNestedDataTypes,
+          expectedSinglyNestedDataTypesValue.get().getValue(allPrimitiveDataTypesList));
+      assertNotNull(
+          singlyNestedDataTypes,
+          actualSinglyNestedDataTypesValue.get().get(allPrimitiveDataTypesList));
+    }
+  }
+
+  private static Map<String, Object> values(Row row) {
+    Map<String, Object> result = new HashMap<>();
+    Schema schema = row.getSchema();
+    for (String key : schema.getFieldNames()) {
+      result.put(key, row.getValue(key));
+    }
+    return result;
+  }
+
+  private static List<Map<String, Object>> valuesList(Collection<Row> values) {
+    return values.stream().map(XmlRowValueTest::values).collect(Collectors.toList());
+  }
+
+  private static Map<String, Object> values(Map<String, XmlRowValue> nested) {
+    Map<String, Object> result = new HashMap<>();
+    for (String key : nested.keySet()) {
+      result.put(key, nested.get(key).getPrimitiveValue());
+    }
+    return result;
+  }
+
+  private static List<Map<String, Object>> valuesList(List<XmlRowValue> nestedList) {
+    List<Map<String, Object>> result = new ArrayList<>();
+    for (XmlRowValue item : nestedList) {
+      Optional<Map<String, XmlRowValue>> nestedValues = Optional.ofNullable(item.getNestedValue());
+      nestedValues.ifPresent(stringXmlRowValueMap -> result.add(values(stringXmlRowValueMap)));
+    }
+    return result;
+  }
+
+  private static List<DateTime> dateTimes(List<XmlRowValue> dateTimeValues) {
+    List<DateTime> result = new ArrayList<>();
+    for (XmlRowValue item : dateTimeValues) {
+      Optional<DateTime> value = Optional.ofNullable(item.getDateTimeValue());
+      value.ifPresent(result::add);
+    }
+    return result;
+  }
+}
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java b/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java
deleted file mode 100644
index f8763725e45..00000000000
--- a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.fileschematransform;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.io.Providers;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * {@link FileWriteSchemaTransformFormatProviders} contains {@link
- * FileWriteSchemaTransformFormatProvider} implementations.
- *
- * <p>The design goals of this class are to enable clean {@link
- * FileWriteSchemaTransformConfiguration#getFormat()} lookups that map to the appropriate {@link
- * org.apache.beam.sdk.io.FileIO.Write} that encodes the file data into the configured format.
- */
-@Internal
-public final class FileWriteSchemaTransformFormatProviders {
-  private static final String AVRO = "avro";
-  private static final String CSV = "csv";
-  private static final String JSON = "json";
-  private static final String PARQUET = "parquet";
-  private static final String XML = "xml";
-
-  /** Load all {@link FileWriteSchemaTransformFormatProvider} implementations. */
-  public static Map<String, FileWriteSchemaTransformFormatProvider> loadProviders() {
-    return Providers.loadProviders(FileWriteSchemaTransformFormatProvider.class);
-  }
-
-  /** A {@link FileWriteSchemaTransformFormatProvider} for avro format. */
-  @AutoService(FileWriteSchemaTransformFormatProvider.class)
-  public static class Avro implements FileWriteSchemaTransformFormatProvider {
-    @Override
-    public String identifier() {
-      return AVRO;
-    }
-
-    @Override
-    public PTransform<PCollection<?>, PDone> buildTransform() {
-      // TODO(https://github.com/apache/beam/issues/24472)
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  /** A {@link FileWriteSchemaTransformFormatProvider} for CSV format. */
-  @AutoService(FileWriteSchemaTransformFormatProvider.class)
-  public static class Csv implements FileWriteSchemaTransformFormatProvider {
-
-    @Override
-    public String identifier() {
-      return CSV;
-    }
-
-    @Override
-    public PTransform<PCollection<?>, PDone> buildTransform() {
-      // TODO(https://github.com/apache/beam/issues/24472)
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  /** A {@link FileWriteSchemaTransformFormatProvider} for JSON format. */
-  @AutoService(FileWriteSchemaTransformFormatProvider.class)
-  public static class Json implements FileWriteSchemaTransformFormatProvider {
-    @Override
-    public String identifier() {
-      return JSON;
-    }
-
-    @Override
-    public PTransform<PCollection<?>, PDone> buildTransform() {
-      // TODO(https://github.com/apache/beam/issues/24472)
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  /** A {@link FileWriteSchemaTransformFormatProvider} for Parquet format. */
-  @AutoService(FileWriteSchemaTransformFormatProvider.class)
-  public static class Parquet implements FileWriteSchemaTransformFormatProvider {
-    @Override
-    public String identifier() {
-      return PARQUET;
-    }
-
-    @Override
-    public PTransform<PCollection<?>, PDone> buildTransform() {
-      // TODO(https://github.com/apache/beam/issues/24472)
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  /** A {@link FileWriteSchemaTransformFormatProvider} for XML format. */
-  @AutoService(FileWriteSchemaTransformFormatProvider.class)
-  public static class Xml implements FileWriteSchemaTransformFormatProvider {
-    @Override
-    public String identifier() {
-      return XML;
-    }
-
-    @Override
-    public PTransform<PCollection<?>, PDone> buildTransform() {
-      // TODO(https://github.com/apache/beam/issues/24472)
-      throw new UnsupportedOperationException();
-    }
-  }
-}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 4455981a3fd..b8879bfa3f3 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -185,7 +185,7 @@ include(":sdks:java:io:file-based-io-tests")
 include(":sdks:java:io:bigquery-io-perf-tests")
 include(":sdks:java:io:cdap")
 include(":sdks:java:io:csv")
-include(":sdks:java:io:fileschematransform")
+include(":sdks:java:io:file-schema-transform")
 include(":sdks:java:io:google-cloud-platform")
 include(":sdks:java:io:google-cloud-platform:expansion-service")
 include(":sdks:java:io:hadoop-common")