You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/02/03 17:43:56 UTC
[beam] branch master updated: 24472 Implement FileWriteSchemaTransformProvider (#24806)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b3be8b65aff 24472 Implement FileWriteSchemaTransformProvider (#24806)
b3be8b65aff is described below
commit b3be8b65aff0649e52c4d1c7e1d19e7da34da342
Author: Damon <da...@users.noreply.github.com>
AuthorDate: Fri Feb 3 09:43:49 2023 -0800
24472 Implement FileWriteSchemaTransformProvider (#24806)
* Implement FileWriteSchemaTransformProvider
* Final cleanup
* Remove unused code
* Refactor file structure; add more unit tests
* XmlRowAdapter Tests
* Remove unused class
* Remove unused test code
* Further cleanup
* Convert PDone to PCollection<String>
* WIP
* Refactor tests
* wip
* Final cleanup
* Revert settings.gradle.kts
* Patch SchemaAwareJavaBeansTest
* Remove 2nd call to readpipeline run
* Set numshards=1 for xml content tests
* Patch code comments
* Update OWNERS
* Comply with Python SDK external transforms
* Refactor Python SDK compliant String check
---
.../beam/sdk/io/common/SchemaAwareJavaBeans.java | 494 +++++++++++++++++++++
.../sdk/io/common/SchemaAwareJavaBeansTest.java | 175 ++++++++
.../OWNERS | 3 +
.../build.gradle | 20 +-
.../AvroWriteSchemaTransformFormatProvider.java | 88 ++++
.../CsvWriteSchemaTransformFormatProvider.java} | 31 +-
.../FileWriteSchemaTransformConfiguration.java | 66 +--
.../FileWriteSchemaTransformFormatProvider.java | 16 +-
.../FileWriteSchemaTransformFormatProviders.java | 148 ++++++
.../FileWriteSchemaTransformProvider.java | 191 ++++++++
.../JsonWriteSchemaTransformFormatProvider.java | 93 ++++
.../ParquetWriteSchemaTransformFormatProvider.java | 117 +++++
.../fileschematransform/XmlDateTimeAdapter.java} | 36 +-
.../sdk/io/fileschematransform/XmlRowAdapter.java | 93 ++++
.../sdk/io/fileschematransform/XmlRowValue.java | 268 +++++++++++
.../XmlWriteSchemaTransformFormatProvider.java | 115 +++++
.../sdk/io/fileschematransform/package-info.java | 0
...FileWriteSchemaTransformFormatProviderTest.java | 104 +++++
...FileWriteSchemaTransformFormatProviderTest.java | 434 ++++++++++++++++++
...WriteSchemaTransformFormatProviderTestData.java | 231 ++++++++++
...ileWriteSchemaTransformFormatProvidersTest.java | 17 +-
.../FileWriteSchemaTransformProviderTest.java | 127 ++++++
...FileWriteSchemaTransformFormatProviderTest.java | 104 +++++
...FileWriteSchemaTransformFormatProviderTest.java | 112 +++++
.../XmlDateTimeAdapterTest.java} | 32 +-
...FileWriteSchemaTransformFormatProviderTest.java | 120 +++++
.../io/fileschematransform/XmlRowAdapterTest.java | 190 ++++++++
.../io/fileschematransform/XmlRowValueTest.java | 391 ++++++++++++++++
.../FileWriteSchemaTransformFormatProviders.java | 124 ------
settings.gradle.kts | 2 +-
30 files changed, 3707 insertions(+), 235 deletions(-)
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java
new file mode 100644
index 00000000000..702bc29c90d
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Instant;
+
+/** Various Java Beans and associated schemas used in tests. */
+public class SchemaAwareJavaBeans {
+
+ private static final DefaultSchemaProvider DEFAULT_SCHEMA_PROVIDER = new DefaultSchemaProvider();
+
+ /** Convenience method for {@link AllPrimitiveDataTypes} instantiation. */
+ public static AllPrimitiveDataTypes allPrimitiveDataTypes(
+ Boolean aBoolean,
+ Byte aByte,
+ BigDecimal aDecimal,
+ Double aDouble,
+ Float aFloat,
+ Short aShort,
+ Integer anInteger,
+ Long aLong,
+ String aString) {
+ return new AutoValue_SchemaAwareJavaBeans_AllPrimitiveDataTypes.Builder()
+ .setABoolean(aBoolean)
+ .setAByte(aByte)
+ .setADecimal(aDecimal)
+ .setADouble(aDouble)
+ .setAFloat(aFloat)
+ .setAShort(aShort)
+ .setAnInteger(anInteger)
+ .setALong(aLong)
+ .setAString(aString)
+ .build();
+ }
+
+ /** Convenience method for {@link NullableAllPrimitiveDataTypes} instantiation. */
+ public static NullableAllPrimitiveDataTypes nullableAllPrimitiveDataTypes(
+ @Nullable Boolean aBoolean,
+ @Nullable Double aDouble,
+ @Nullable Float aFloat,
+ @Nullable Integer anInteger,
+ @Nullable Long aLong,
+ @Nullable String aString) {
+ return new AutoValue_SchemaAwareJavaBeans_NullableAllPrimitiveDataTypes.Builder()
+ .setABoolean(aBoolean)
+ .setADouble(aDouble)
+ .setAFloat(aFloat)
+ .setAnInteger(anInteger)
+ .setALong(aLong)
+ .setAString(aString)
+ .build();
+ }
+
+ /** Convenience method for {@link TimeContaining} instantiation. */
+ public static TimeContaining timeContaining(Instant instant, List<Instant> instantList) {
+ return new AutoValue_SchemaAwareJavaBeans_TimeContaining.Builder()
+ .setInstant(instant)
+ .setInstantList(instantList)
+ .build();
+ }
+
+ /** Convenience method for {@link ArrayPrimitiveDataTypes} instantiation. */
+ public static ArrayPrimitiveDataTypes arrayPrimitiveDataTypes(
+ List<Boolean> booleans,
+ List<Double> doubles,
+ List<Float> floats,
+ List<Short> shorts,
+ List<Integer> integers,
+ List<Long> longs,
+ List<String> strings) {
+ return new AutoValue_SchemaAwareJavaBeans_ArrayPrimitiveDataTypes.Builder()
+ .setBooleanList(booleans)
+ .setDoubleList(doubles)
+ .setFloatList(floats)
+ .setShortList(shorts)
+ .setIntegerList(integers)
+ .setLongList(longs)
+ .setStringList(strings)
+ .build();
+ }
+
+ /** Convenience method for {@link SinglyNestedDataTypes} instantiation. */
+ public static SinglyNestedDataTypes singlyNestedDataTypes(
+ AllPrimitiveDataTypes allPrimitiveDataTypes, AllPrimitiveDataTypes... repeated) {
+ return new AutoValue_SchemaAwareJavaBeans_SinglyNestedDataTypes.Builder()
+ .setAllPrimitiveDataTypes(allPrimitiveDataTypes)
+ .setAllPrimitiveDataTypesList(Arrays.stream(repeated).collect(Collectors.toList()))
+ .build();
+ }
+
+ /** Convenience method for {@link DoublyNestedDataTypes} instantiation. */
+ public static DoublyNestedDataTypes doublyNestedDataTypes(
+ SinglyNestedDataTypes singlyNestedDataTypes, SinglyNestedDataTypes... repeated) {
+ return new AutoValue_SchemaAwareJavaBeans_DoublyNestedDataTypes.Builder()
+ .setSinglyNestedDataTypes(singlyNestedDataTypes)
+ .setSinglyNestedDataTypesList(Arrays.stream(repeated).collect(Collectors.toList()))
+ .build();
+ }
+
+ private static final TypeDescriptor<AllPrimitiveDataTypes>
+ ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(AllPrimitiveDataTypes.class);
+
+ /** The schema for {@link AllPrimitiveDataTypes}. */
+ public static final Schema ALL_PRIMITIVE_DATA_TYPES_SCHEMA =
+ DEFAULT_SCHEMA_PROVIDER.schemaFor(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link AllPrimitiveDataTypes} to a
+ * {@link Row}.
+ */
+ public static SerializableFunction<AllPrimitiveDataTypes, Row> allPrimitiveDataTypesToRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.toRowFunction(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+ * AllPrimitiveDataTypes}.
+ */
+ public static SerializableFunction<Row, AllPrimitiveDataTypes> allPrimitiveDataTypesFromRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ private static final TypeDescriptor<NullableAllPrimitiveDataTypes>
+ NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(NullableAllPrimitiveDataTypes.class);
+
+ /** The schema for {@link NullableAllPrimitiveDataTypes}. */
+ public static final Schema NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA =
+ DEFAULT_SCHEMA_PROVIDER.schemaFor(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link NullableAllPrimitiveDataTypes}
+ * to a {@link Row}.
+ */
+ public static SerializableFunction<NullableAllPrimitiveDataTypes, Row>
+ nullableAllPrimitiveDataTypesToRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.toRowFunction(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+ * NullableAllPrimitiveDataTypes}.
+ */
+ public static SerializableFunction<Row, NullableAllPrimitiveDataTypes>
+ nullableAllPrimitiveDataTypesFromRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(
+ NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ private static final TypeDescriptor<TimeContaining> TIME_CONTAINING_TYPE_DESCRIPTOR =
+ TypeDescriptor.of(TimeContaining.class);
+
+ /** The schema for {@link TimeContaining}. */
+ public static final Schema TIME_CONTAINING_SCHEMA =
+ DEFAULT_SCHEMA_PROVIDER.schemaFor(TIME_CONTAINING_TYPE_DESCRIPTOR);
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link TimeContaining} to a {@link
+ * Row}.
+ */
+ public static SerializableFunction<TimeContaining, Row> timeContainingToRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.toRowFunction(TIME_CONTAINING_TYPE_DESCRIPTOR);
+ }
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+ * TimeContaining}.
+ */
+ public static SerializableFunction<Row, TimeContaining> timeContainingFromRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(TIME_CONTAINING_TYPE_DESCRIPTOR);
+ }
+
+ private static final TypeDescriptor<ArrayPrimitiveDataTypes>
+ ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(ArrayPrimitiveDataTypes.class);
+
+ /** The schema for {@link ArrayPrimitiveDataTypes}. */
+ public static final Schema ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA =
+ DEFAULT_SCHEMA_PROVIDER.schemaFor(ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link ArrayPrimitiveDataTypes} to a
+ * {@link Row}.
+ */
+ public static SerializableFunction<ArrayPrimitiveDataTypes, Row>
+ arrayPrimitiveDataTypesToRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.toRowFunction(ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+ * ArrayPrimitiveDataTypes}.
+ */
+ public static SerializableFunction<Row, ArrayPrimitiveDataTypes>
+ arrayPrimitiveDataTypesFromRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(ARRAY_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ private static final TypeDescriptor<SinglyNestedDataTypes>
+ SINGLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(SinglyNestedDataTypes.class);
+
+ /** The schema for {@link SinglyNestedDataTypes}. */
+ public static final Schema SINGLY_NESTED_DATA_TYPES_SCHEMA =
+ DEFAULT_SCHEMA_PROVIDER.schemaFor(SINGLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link SinglyNestedDataTypes} to a
+ * {@link Row}.
+ */
+ public static SerializableFunction<SinglyNestedDataTypes, Row> singlyNestedDataTypesToRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.toRowFunction(SINGLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+ * SinglyNestedDataTypes}.
+ */
+ public static SerializableFunction<Row, SinglyNestedDataTypes> singlyNestedDataTypesFromRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(SINGLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ private static final TypeDescriptor<DoublyNestedDataTypes>
+ DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR = TypeDescriptor.of(DoublyNestedDataTypes.class);
+
+ /** The schema for {@link DoublyNestedDataTypes}. */
+ public static final Schema DOUBLY_NESTED_DATA_TYPES_SCHEMA =
+ DEFAULT_SCHEMA_PROVIDER.schemaFor(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link DoublyNestedDataTypes} to a
+ * {@link Row}.
+ */
+ public static SerializableFunction<DoublyNestedDataTypes, Row> doublyNestedDataTypesToRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.toRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ /**
+ * Returns a {@link SerializableFunction} to convert from a {@link Row} to a {@link
+ * DoublyNestedDataTypes}.
+ */
+ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataTypesFromRowFn() {
+ return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
+ }
+
+ /**
+ * Contains all primitive Java types i.e. String, Integer, etc and {@link BigDecimal}. The purpose
+ * of this class is to test schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class AllPrimitiveDataTypes implements Serializable {
+
+ public abstract Boolean getABoolean();
+
+ public abstract Byte getAByte();
+
+ public abstract BigDecimal getADecimal();
+
+ public abstract Double getADouble();
+
+ public abstract Float getAFloat();
+
+ public abstract Short getAShort();
+
+ public abstract Integer getAnInteger();
+
+ public abstract Long getALong();
+
+ public abstract String getAString();
+
+ public abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setABoolean(Boolean value);
+
+ public abstract Builder setAByte(Byte value);
+
+ public abstract Builder setADecimal(BigDecimal value);
+
+ public abstract Builder setADouble(Double value);
+
+ public abstract Builder setAFloat(Float value);
+
+ public abstract Builder setAShort(Short value);
+
+ public abstract Builder setAnInteger(Integer value);
+
+ public abstract Builder setALong(Long value);
+
+ public abstract Builder setAString(String value);
+
+ public abstract AllPrimitiveDataTypes build();
+ }
+ }
+
+ /**
+ * Contains all nullable primitive Java types i.e. String, Integer, etc and {@link BigDecimal}.
+ * The purpose of this class is to test schema-aware PTransforms with flat {@link Schema} {@link
+ * Row}s.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class NullableAllPrimitiveDataTypes implements Serializable {
+
+ @Nullable
+ public abstract Boolean getABoolean();
+
+ @Nullable
+ public abstract Double getADouble();
+
+ @Nullable
+ public abstract Float getAFloat();
+
+ @Nullable
+ public abstract Integer getAnInteger();
+
+ @Nullable
+ public abstract Long getALong();
+
+ @Nullable
+ public abstract String getAString();
+
+ public abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setABoolean(Boolean value);
+
+ public abstract Builder setADouble(Double value);
+
+ public abstract Builder setAFloat(Float value);
+
+ public abstract Builder setAnInteger(Integer value);
+
+ public abstract Builder setALong(Long value);
+
+ public abstract Builder setAString(String value);
+
+ public abstract NullableAllPrimitiveDataTypes build();
+ }
+ }
+
+ /**
+ * Contains time-related types. The purpose of this class is to test schema-aware PTransforms with
+ * time-related {@link Schema.FieldType} containing {@link Row}s.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class TimeContaining {
+
+ public abstract Instant getInstant();
+
+ public abstract List<Instant> getInstantList();
+
+ public abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setInstant(Instant value);
+
+ public abstract Builder setInstantList(List<Instant> value);
+
+ public abstract TimeContaining build();
+ }
+ }
+
+ /**
+ * Contains arrays of all primitive Java types i.e. String, Integer, etc and {@link BigDecimal}.
+ * The purpose of this class is to test schema-aware PTransforms with {@link Row}s containing
+ * repeated primitive Java types.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class ArrayPrimitiveDataTypes {
+
+ public abstract List<Boolean> getBooleanList();
+
+ public abstract List<Double> getDoubleList();
+
+ public abstract List<Float> getFloatList();
+
+ public abstract List<Short> getShortList();
+
+ public abstract List<Integer> getIntegerList();
+
+ public abstract List<Long> getLongList();
+
+ public abstract List<String> getStringList();
+
+ public abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setBooleanList(List<Boolean> value);
+
+ public abstract Builder setDoubleList(List<Double> value);
+
+ public abstract Builder setFloatList(List<Float> value);
+
+ public abstract Builder setShortList(List<Short> value);
+
+ public abstract Builder setIntegerList(List<Integer> value);
+
+ public abstract Builder setLongList(List<Long> value);
+
+ public abstract Builder setStringList(List<String> value);
+
+ public abstract ArrayPrimitiveDataTypes build();
+ }
+ }
+
+ /**
+ * Contains a singly nested and repeated {@link AllPrimitiveDataTypes}. The purpose of this class
+ * is to test schema-aware PTransforms with {@link Row}s containing nested and repeated complex
+ * Java types.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class SinglyNestedDataTypes {
+
+ public abstract AllPrimitiveDataTypes getAllPrimitiveDataTypes();
+
+ public abstract List<AllPrimitiveDataTypes> getAllPrimitiveDataTypesList();
+
+ public abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setAllPrimitiveDataTypes(AllPrimitiveDataTypes value);
+
+ public abstract Builder setAllPrimitiveDataTypesList(List<AllPrimitiveDataTypes> value);
+
+ public abstract SinglyNestedDataTypes build();
+ }
+ }
+
+ /**
+ * Contains a nested and repeated {@link SinglyNestedDataTypes}. The purpose of this class is to
+ * test schema-aware PTransforms with {@link Row}s containing deeper nested and repeated complex
+ * Java types.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class DoublyNestedDataTypes {
+
+ public abstract SinglyNestedDataTypes getSinglyNestedDataTypes();
+
+ public abstract List<SinglyNestedDataTypes> getSinglyNestedDataTypesList();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setSinglyNestedDataTypes(SinglyNestedDataTypes value);
+
+ public abstract Builder setSinglyNestedDataTypesList(List<SinglyNestedDataTypes> value);
+
+ public abstract DoublyNestedDataTypes build();
+ }
+ }
+}
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeansTest.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeansTest.java
new file mode 100644
index 00000000000..76de0b75501
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeansTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypesFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn;
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.AllPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ArrayPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.DoublyNestedDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NullableAllPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SinglyNestedDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SchemaAwareJavaBeans}. */
+@RunWith(JUnit4.class)
+public class SchemaAwareJavaBeansTest {
+ @Test
+ public void allPrimitiveDataTypesRowFns() {
+ AllPrimitiveDataTypes element =
+ allPrimitiveDataTypes(
+ false, (byte) 1, BigDecimal.valueOf(1L), 1.2345, 1.2345f, (short) 1, 1, 1L, "a");
+
+ Row row = allPrimitiveDataTypesToRowFn().apply(element);
+ assertEquals(element, allPrimitiveDataTypesFromRowFn().apply(row));
+ }
+
+ @Test
+ public void nullableAllPrimitiveDataTypesRowFns() {
+ NullableAllPrimitiveDataTypes allNull =
+ nullableAllPrimitiveDataTypes(null, null, null, null, null, null);
+ Row allNullRow = nullableAllPrimitiveDataTypesToRowFn().apply(allNull);
+ assertEquals(allNull, nullableAllPrimitiveDataTypesFromRowFn().apply(allNullRow));
+
+ NullableAllPrimitiveDataTypes nonNull =
+ nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, 1, 1L, "a");
+ Row nonNullRow = nullableAllPrimitiveDataTypesToRowFn().apply(nonNull);
+ assertEquals(nonNull, nullableAllPrimitiveDataTypesFromRowFn().apply(nonNullRow));
+ }
+
+ @Test
+ public void timeContainingRowFns() {
+ TimeContaining element =
+ timeContaining(
+ Instant.ofEpochMilli(1L),
+ Arrays.asList(
+ Instant.ofEpochMilli(2L), Instant.ofEpochMilli(3L), Instant.ofEpochMilli(4L)));
+ Row row = timeContainingToRowFn().apply(element);
+ assertEquals(element, timeContainingFromRowFn().apply(row));
+ }
+
+ @Test
+ public void arrayPrimitiveDataTypesRowFns() {
+ ArrayPrimitiveDataTypes allEmpty =
+ arrayPrimitiveDataTypes(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList());
+ Row allEmptyRow = arrayPrimitiveDataTypesToRowFn().apply(allEmpty);
+ assertEquals(allEmpty, arrayPrimitiveDataTypesFromRowFn().apply(allEmptyRow));
+
+ ArrayPrimitiveDataTypes noneEmpty =
+ arrayPrimitiveDataTypes(
+ Stream.generate(() -> true).limit(Short.MAX_VALUE).collect(Collectors.toList()),
+ Stream.generate(() -> Double.MIN_VALUE)
+ .limit(Short.MAX_VALUE)
+ .collect(Collectors.toList()),
+ Stream.generate(() -> Float.MIN_VALUE)
+ .limit(Short.MAX_VALUE)
+ .collect(Collectors.toList()),
+ Stream.generate(() -> Short.MIN_VALUE)
+ .limit(Short.MAX_VALUE)
+ .collect(Collectors.toList()),
+ Stream.generate(() -> Integer.MIN_VALUE)
+ .limit(Short.MAX_VALUE)
+ .collect(Collectors.toList()),
+ Stream.generate(() -> Long.MIN_VALUE)
+ .limit(Short.MAX_VALUE)
+ .collect(Collectors.toList()),
+ Stream.generate(() -> "🐿").limit(Short.MAX_VALUE).collect(Collectors.toList()));
+ Row noneEmptyRow = arrayPrimitiveDataTypesToRowFn().apply(noneEmpty);
+ assertEquals(noneEmpty, arrayPrimitiveDataTypesFromRowFn().apply(noneEmptyRow));
+ }
+
+ @Test
+ public void singlyNestedDataTypesRowFns() {
+ AllPrimitiveDataTypes element =
+ allPrimitiveDataTypes(
+ false, (byte) 1, BigDecimal.valueOf(1L), 1.2345, 1.2345f, (short) 1, 1, 1L, "a");
+ SinglyNestedDataTypes notRepeated = singlyNestedDataTypes(element);
+ SinglyNestedDataTypes repeated = singlyNestedDataTypes(element, element, element, element);
+ Row notRepeatedRow = singlyNestedDataTypesToRowFn().apply(notRepeated);
+ Row repeatedRow = singlyNestedDataTypesToRowFn().apply(repeated);
+ assertEquals(notRepeated, singlyNestedDataTypesFromRowFn().apply(notRepeatedRow));
+ assertEquals(repeated, singlyNestedDataTypesFromRowFn().apply(repeatedRow));
+ }
+
+ @Test
+ public void doublyNestedDataTypesRowFns() {
+ AllPrimitiveDataTypes element =
+ allPrimitiveDataTypes(
+ false, (byte) 1, BigDecimal.valueOf(1L), 1.2345, 1.2345f, (short) 1, 1, 1L, "a");
+ DoublyNestedDataTypes d0s0 = doublyNestedDataTypes(singlyNestedDataTypes(element));
+ DoublyNestedDataTypes d1s0 =
+ doublyNestedDataTypes(
+ singlyNestedDataTypes(element),
+ singlyNestedDataTypes(element),
+ singlyNestedDataTypes(element),
+ singlyNestedDataTypes(element));
+ DoublyNestedDataTypes d0s1 =
+ doublyNestedDataTypes(singlyNestedDataTypes(element, element, element, element));
+ DoublyNestedDataTypes d1s1 =
+ doublyNestedDataTypes(
+ singlyNestedDataTypes(element, element, element, element),
+ singlyNestedDataTypes(element, element, element, element),
+ singlyNestedDataTypes(element, element, element, element),
+ singlyNestedDataTypes(element, element, element, element));
+
+ Row d0s0Row = doublyNestedDataTypesToRowFn().apply(d0s0);
+ Row d1s0Row = doublyNestedDataTypesToRowFn().apply(d1s0);
+ Row d0s1Row = doublyNestedDataTypesToRowFn().apply(d0s1);
+ Row d1s1Row = doublyNestedDataTypesToRowFn().apply(d1s1);
+
+ assertEquals(d0s0, doublyNestedDataTypesFromRowFn().apply(d0s0Row));
+ assertEquals(d1s0, doublyNestedDataTypesFromRowFn().apply(d1s0Row));
+ assertEquals(d0s1, doublyNestedDataTypesFromRowFn().apply(d0s1Row));
+ assertEquals(d1s1, doublyNestedDataTypesFromRowFn().apply(d1s1Row));
+ }
+}
diff --git a/sdks/java/io/fileschematransform/OWNERS b/sdks/java/io/file-schema-transform/OWNERS
similarity index 55%
rename from sdks/java/io/fileschematransform/OWNERS
rename to sdks/java/io/file-schema-transform/OWNERS
index c207accc003..aed54489c35 100644
--- a/sdks/java/io/fileschematransform/OWNERS
+++ b/sdks/java/io/file-schema-transform/OWNERS
@@ -1,5 +1,8 @@
# See the OWNERS docs at https://s.apache.org/beam-owners
+# Reviewers below listed in alphabetic order
reviewers:
+ - ahmedabu98
+ - damondouglas
- johnjcasey
- pabloem
diff --git a/sdks/java/io/fileschematransform/build.gradle b/sdks/java/io/file-schema-transform/build.gradle
similarity index 70%
rename from sdks/java/io/fileschematransform/build.gradle
rename to sdks/java/io/file-schema-transform/build.gradle
index f443dca13c0..ac650692f2d 100644
--- a/sdks/java/io/fileschematransform/build.gradle
+++ b/sdks/java/io/file-schema-transform/build.gradle
@@ -32,15 +32,23 @@ configurations.implementation {
}
}
-// exclude auto-generated classes and integration tests
-def jacocoExcludes = [
- '**/AutoValue_*',
- '**/*IT*',
-]
+def parquet_version = "1.12.0"
dependencies {
- implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation library.java.avro
implementation library.java.commons_csv
+ implementation library.java.jaxb_api
+ implementation library.java.joda_time
+ implementation library.java.vendored_guava_26_0_jre
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation project(path: ":sdks:java:io:parquet")
+ implementation "org.apache.parquet:parquet-common:$parquet_version"
+ implementation project(path: ":sdks:java:io:xml")
+
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:core", configuration: "shadow")
+ testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
+ testImplementation project(path: ":sdks:java:io:parquet")
+ testImplementation project(path: ":sdks:java:io:xml")
+ testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java
new file mode 100644
index 00000000000..75cf841beab
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroWriteSchemaTransformFormatProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.getNumShards;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.getShardNameTemplate;
+
+import com.google.auto.service.AutoService;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/** A {@link FileWriteSchemaTransformFormatProvider} for avro format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class AvroWriteSchemaTransformFormatProvider
+ implements FileWriteSchemaTransformFormatProvider {
+
+ @Override
+ public String identifier() {
+ return FileWriteSchemaTransformFormatProviders.AVRO;
+ }
+
+ /**
+ * Builds a {@link PTransform} that transforms a {@link Row} {@link PCollection} into result
+ * {@link PCollection} file names written using {@link AvroIO.Write}.
+ */
+ @Override
+ public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+ FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+
+ return new PTransform<PCollection<Row>, PCollection<String>>() {
+ @Override
+ public PCollection<String> expand(PCollection<Row> input) {
+
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+ AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
+
+ PCollection<GenericRecord> avro =
+ input
+ .apply(
+ "Row To Avro Generic Record",
+ FileWriteSchemaTransformFormatProviders.mapRowsToGenericRecords(schema))
+ .setCoder(coder);
+
+ AvroIO.Write<GenericRecord> write =
+ AvroIO.writeGenericRecords(avroSchema).to(configuration.getFilenamePrefix());
+
+ if (configuration.getNumShards() != null) {
+ int numShards = getNumShards(configuration);
+ // Python SDK external transforms do not support null values requiring additional check.
+ if (numShards > 0) {
+ write = write.withNumShards(numShards);
+ }
+ }
+
+ if (!Strings.isNullOrEmpty(configuration.getShardNameTemplate())) {
+ write = write.withShardNameTemplate(getShardNameTemplate(configuration));
+ }
+
+ return avro.apply("Write Avro", write.withOutputFilenames())
+ .getPerDestinationOutputFilenames()
+ .apply("perDestinationOutputFilenames", Values.create());
+ }
+ };
+ }
+}
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/CsvWriteSchemaTransformFormatProvider.java
similarity index 56%
copy from sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
copy to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/CsvWriteSchemaTransformFormatProvider.java
index a6db5ffabe9..5c6b803cde1 100644
--- a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/CsvWriteSchemaTransformFormatProvider.java
@@ -17,21 +17,26 @@
*/
package org.apache.beam.sdk.io.fileschematransform;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.io.Providers;
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
-/**
- * Provides a {@link PTransform} that consumes a {@link PCollection} according to a registered
- * {@link com.google.auto.service.AutoService} {@link FileWriteSchemaTransformFormatProvider}
- * implementation. See {@link FileWriteSchemaTransformFormatProviders} for a list of available
- * formats.
- */
-@Internal
-public interface FileWriteSchemaTransformFormatProvider extends Providers.Identifyable {
+/** A {@link FileWriteSchemaTransformFormatProvider} for CSV format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class CsvWriteSchemaTransformFormatProvider
+ implements FileWriteSchemaTransformFormatProvider {
+
+ @Override
+ public String identifier() {
+ return FileWriteSchemaTransformFormatProviders.CSV;
+ }
- /** Builds a {@link PTransform} that consumes {@link PCollection}. */
- PTransform<PCollection<?>, PDone> buildTransform();
+ @Override
+ public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+ FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+ // TODO(https://github.com/apache/beam/issues/24469)
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
similarity index 77%
rename from sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
rename to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
index 10af9b99430..160d51dd293 100644
--- a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformConfiguration.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.io.fileschematransform;
import com.google.auto.value.AutoValue;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -46,22 +48,22 @@ public abstract class FileWriteSchemaTransformConfiguration {
}
public static XmlConfiguration.Builder xmlConfigurationBuilder() {
- return new AutoValue_FileWriteSchemaTransformConfiguration_XmlConfiguration.Builder();
+ return new AutoValue_FileWriteSchemaTransformConfiguration_XmlConfiguration.Builder()
+ .setCharset(StandardCharsets.UTF_8.name());
}
/**
- * The format of the file content. Used as {@link String} key lookup of {@link
+ * The format of the file content. Used as String key lookup of {@link
* FileWriteSchemaTransformFormatProviders#loadProviders()}.
*/
public abstract String getFormat();
- /** Specifies a common prefix to use for all generated filenames. */
+ /** A common prefix to use for all generated filenames. */
public abstract String getFilenamePrefix();
/**
- * Specifies to compress all generated shard files by default, append the respective extension to
- * the filename. See {@link org.apache.beam.sdk.io.Compression} for expected values, though
- * stringified in all lowercase format.
+ * The compression of all generated shard files. By default, appends the respective extension to
+ * the filename. See {@link org.apache.beam.sdk.io.Compression} for expected values.
*/
@Nullable
public abstract String getCompression();
@@ -90,22 +92,20 @@ public abstract class FileWriteSchemaTransformConfiguration {
@Nullable
public abstract XmlConfiguration getXmlConfiguration();
+ abstract Builder toBuilder();
+
@AutoValue.Builder
public abstract static class Builder {
- /**
- * The format of the file content. Used as {@link String} key lookup of {@link
- * FileWriteSchemaTransformFormatProviders#loadProviders()}.
- */
+ /** The format of the file content. See {@link #getFormat()} for more details. */
public abstract Builder setFormat(String value);
- /** Specifies a common prefix to use for all generated filenames. */
+ /** A common prefix to use for all generated filenames. */
public abstract Builder setFilenamePrefix(String value);
/**
- * Specifies to compress all generated shard files by default, append the respective extension
- * to the filename. See {@link org.apache.beam.sdk.io.Compression} for expected values, though
- * stringified in all lowercase format.
+ * The file {@link org.apache.beam.sdk.io.Compression} See {@link #getCompression()} for more
+ * details.
*/
public abstract Builder setCompression(String value);
@@ -135,23 +135,10 @@ public abstract class FileWriteSchemaTransformConfiguration {
@AutoValue
public abstract static class CsvConfiguration {
- /**
- * Not to be confused with the CSV header, it is content written to the top of every sharded
- * file prior to the header. In the example below, all the text proceeding the header
- * 'column1,column2,column3' is the preamble.
- *
- * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator:
- * John Doe
- *
- * <p>column1,column2,column3 1,2,3 4,5,6
- */
- @Nullable
- public abstract String getPreamble();
-
/**
* The format of the written CSV file. See <a
* href="https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html">org.apache.commons.csv.CSVFormat</a>
- * for allowed values, stringified in lowercase. Defaults to <a
+ * for allowed values. Defaults to <a
* href="https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html#DEFAULT">CSVFormat.DEFAULT</a>
*/
public abstract String getCsvFormat();
@@ -159,22 +146,10 @@ public abstract class FileWriteSchemaTransformConfiguration {
@AutoValue.Builder
public abstract static class Builder {
- /**
- * Not to be confused with the CSV header, it is content written to the top of every sharded
- * file prior to the header. In the example below, all the text proceeding the header
- * 'column1,column2,column3' is the preamble.
- *
- * <p>Fake company, Inc. Lab experiment: abcdefg123456 Experiment date: 2022-12-05 Operator:
- * John Doe
- *
- * <p>column1,column2,column3 1,2,3 4,5,6
- */
- public abstract Builder setPreamble(String value);
-
/**
* The format of the written CSV file. See <a
* href="https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html">org.apache.commons.csv.CSVFormat</a>
- * for allowed values, stringified in lowercase. Defaults to <a
+ * for allowed values. Defaults to <a
* href="https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html#DEFAULT">CSVFormat.DEFAULT</a>
*/
public abstract Builder setCsvFormat(String value);
@@ -190,11 +165,12 @@ public abstract class FileWriteSchemaTransformConfiguration {
/**
* Specifies compression codec. See org.apache.parquet.hadoop.metadata.CompressionCodecName for
- * allowed names, stringified in lowercase format.
+ * allowed names.
*/
public abstract String getCompressionCodecName();
/** Specify row-group size; if not set or zero, a default is used by the underlying writer. */
+ @Nullable
public abstract Integer getRowGroupSize();
@AutoValue.Builder
@@ -202,7 +178,7 @@ public abstract class FileWriteSchemaTransformConfiguration {
/**
* Specifies compression codec. See org.apache.parquet.hadoop.metadata.CompressionCodecName
- * for allowed names, stringified in lowercase format.
+ * for allowed names.
*/
public abstract Builder setCompressionCodecName(String value);
@@ -223,7 +199,7 @@ public abstract class FileWriteSchemaTransformConfiguration {
/**
* The charset used to write the file. Defaults to {@link
- * java.nio.charset.StandardCharsets#UTF_8}.
+ * java.nio.charset.StandardCharsets#UTF_8}'s {@link Charset#name()}.
*/
public abstract String getCharset();
@@ -235,7 +211,7 @@ public abstract class FileWriteSchemaTransformConfiguration {
/**
* The charset used to write the file. Defaults to {@link
- * java.nio.charset.StandardCharsets#UTF_8}.
+ * java.nio.charset.StandardCharsets#UTF_8}'s {@link Charset#name()}.
*/
public abstract Builder setCharset(String value);
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
similarity index 66%
rename from sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
rename to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
index a6db5ffabe9..6eaec310cd2 100644
--- a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvider.java
@@ -18,20 +18,26 @@
package org.apache.beam.sdk.io.fileschematransform;
import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.Providers;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
/**
- * Provides a {@link PTransform} that consumes a {@link PCollection} according to a registered
- * {@link com.google.auto.service.AutoService} {@link FileWriteSchemaTransformFormatProvider}
+ * Provides a {@link PTransform} that writes a {@link PCollection} of {@link Row}s and outputs a
+ * {@link PCollection} of the file names according to a registered {@link
+ * com.google.auto.service.AutoService} {@link FileWriteSchemaTransformFormatProvider}
* implementation. See {@link FileWriteSchemaTransformFormatProviders} for a list of available
* formats.
*/
@Internal
public interface FileWriteSchemaTransformFormatProvider extends Providers.Identifyable {
- /** Builds a {@link PTransform} that consumes {@link PCollection}. */
- PTransform<PCollection<?>, PDone> buildTransform();
+ /**
+ * Builds a {@link PTransform} that writes a {@link Row} {@link PCollection} and outputs the
+ * resulting {@link PCollection} of the file names.
+ */
+ PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+ FileWriteSchemaTransformConfiguration configuration, Schema schema);
}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java
new file mode 100644
index 00000000000..8ed73a6b11d
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/**
+ * {@link FileWriteSchemaTransformFormatProviders} contains {@link
+ * FileWriteSchemaTransformFormatProvider} implementations.
+ *
+ * <p>The design goals of this class are to enable clean {@link
+ * FileWriteSchemaTransformConfiguration#getFormat()} lookups mapping to the appropriate {@link
+ * org.apache.beam.sdk.io.FileIO.Write} that encodes the file data into the configured format.
+ */
+@Internal
+public final class FileWriteSchemaTransformFormatProviders {
+ static final String AVRO = "avro";
+ static final String CSV = "csv";
+ static final String JSON = "json";
+ static final String PARQUET = "parquet";
+ static final String XML = "xml";
+
+ /** Load all {@link FileWriteSchemaTransformFormatProvider} implementations. */
+ public static Map<String, FileWriteSchemaTransformFormatProvider> loadProviders() {
+ return Providers.loadProviders(FileWriteSchemaTransformFormatProvider.class);
+ }
+
+ /** Builds a {@link MapElements}i transform to map {@link Row}s to {@link GenericRecord}s. */
+ static MapElements<Row, GenericRecord> mapRowsToGenericRecords(Schema beamSchema) {
+ return MapElements.into(TypeDescriptor.of(GenericRecord.class))
+ .via(AvroUtils.getRowToGenericRecordFunction(AvroUtils.toAvroSchema(beamSchema)));
+ }
+
+ /**
+ * Applies common parameters from {@link FileWriteSchemaTransformConfiguration} to {@link
+ * FileIO.Write}.
+ */
+ static <T> FileIO.Write<Void, T> applyCommonFileIOWriteFeatures(
+ FileIO.Write<Void, T> write, FileWriteSchemaTransformConfiguration configuration) {
+
+ if (!Strings.isNullOrEmpty(configuration.getFilenameSuffix())) {
+ write = write.withSuffix(getFilenameSuffix(configuration));
+ }
+
+ if (configuration.getNumShards() != null) {
+ int numShards = getNumShards(configuration);
+ // Python SDK external transforms do not support null values requiring additional check.
+ if (numShards > 0) {
+ write = write.withNumShards(numShards);
+ }
+ }
+
+ if (!Strings.isNullOrEmpty(configuration.getCompression())) {
+ write = write.withCompression(getCompression(configuration));
+ }
+
+ return write;
+ }
+
+ /**
+ * Applies common parameters from {@link FileWriteSchemaTransformConfiguration} to {@link
+ * TextIO.Write}.
+ */
+ static TextIO.Write applyCommonTextIOWriteFeatures(
+ TextIO.Write write, FileWriteSchemaTransformConfiguration configuration) {
+ write = write.to(configuration.getFilenamePrefix());
+
+ if (!Strings.isNullOrEmpty(configuration.getFilenameSuffix())) {
+ write = write.withSuffix(getFilenameSuffix(configuration));
+ }
+
+ if (!Strings.isNullOrEmpty(configuration.getCompression())) {
+ write = write.withCompression(getCompression(configuration));
+ }
+
+ if (configuration.getNumShards() != null) {
+ int numShards = getNumShards(configuration);
+ // Python SDK external transforms do not support null values requiring additional check.
+ if (numShards > 0) {
+ write = write.withNumShards(numShards);
+ }
+ }
+
+ if (!Strings.isNullOrEmpty(configuration.getShardNameTemplate())) {
+ write = write.withShardNameTemplate(getShardNameTemplate(configuration));
+ }
+
+ return write;
+ }
+
+ private static Compression getCompression(FileWriteSchemaTransformConfiguration configuration) {
+ // resolves Checker Framework incompatible argument for valueOf parameter
+ Optional<String> compression = Optional.ofNullable(configuration.getCompression());
+ checkState(compression.isPresent());
+ return Compression.valueOf(compression.get());
+ }
+
+ private static String getFilenameSuffix(FileWriteSchemaTransformConfiguration configuration) {
+ // resolves Checker Framework incompatible argument for parameter suffix of withSuffix
+ Optional<String> suffix = Optional.ofNullable(configuration.getFilenameSuffix());
+ checkState(suffix.isPresent());
+ return suffix.get();
+ }
+
+ static Integer getNumShards(FileWriteSchemaTransformConfiguration configuration) {
+ // resolves Checker Framework unboxing a possibly-null reference
+ Optional<Integer> numShards = Optional.ofNullable(configuration.getNumShards());
+ checkState(numShards.isPresent());
+ return numShards.get();
+ }
+
+ static String getShardNameTemplate(FileWriteSchemaTransformConfiguration configuration) {
+ // resolves Checker Framework incompatible null argument for parameter shardTemplate
+ Optional<String> shardNameTemplate = Optional.ofNullable(configuration.getShardNameTemplate());
+ checkState(shardNameTemplate.isPresent());
+ return shardNameTemplate.get();
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..ced21527a2f
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.AVRO;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.CSV;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.PARQUET;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/**
+ * A {@link TypedSchemaTransformProvider} implementation for writing a {@link Row} {@link
+ * PCollection} to file systems, driven by a {@link FileWriteSchemaTransformConfiguration}.
+ */
+public class FileWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider<FileWriteSchemaTransformConfiguration> {
+
+ public static final Field FILE_NAME_FIELD = Field.of("fileName", FieldType.STRING);
+ public static final Schema OUTPUT_SCHEMA = Schema.of(FILE_NAME_FIELD);
+
+ private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_write:v1";
+ static final String INPUT_TAG = "input";
+ static final String OUTPUT_TAG = "output";
+
+ /** Provides the required {@link TypedSchemaTransformProvider#configurationClass()}. */
+ @Override
+ protected Class<FileWriteSchemaTransformConfiguration> configurationClass() {
+ return FileWriteSchemaTransformConfiguration.class;
+ }
+
+ /** Builds a {@link SchemaTransform} from a {@link FileWriteSchemaTransformConfiguration}. */
+ @Override
+ protected SchemaTransform from(FileWriteSchemaTransformConfiguration configuration) {
+ return new FileWriteSchemaTransform(configuration);
+ }
+
+ /** Returns the {@link TypedSchemaTransformProvider#identifier()} required for registration. */
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ /** The expected {@link PCollectionRowTuple} input tags. */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ /** The expected {@link PCollectionRowTuple} output tags. */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /**
+ * A {@link PTransform} that converts a {@link PCollectionRowTuple} of {@link
+ * #inputCollectionNames()} tagged {@link Row}s into a {@link PCollectionRowTuple} of {@link
+ * #outputCollectionNames()} tagged {@link Row}s.
+ */
+ static class FileWriteSchemaTransform extends PTransform<PCollectionRowTuple, PCollectionRowTuple>
+ implements SchemaTransform {
+
+ final FileWriteSchemaTransformConfiguration configuration;
+
+ FileWriteSchemaTransform(FileWriteSchemaTransformConfiguration configuration) {
+ validateConfiguration(configuration);
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ if (input.getAll().isEmpty() || input.getAll().size() > 1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s expects a single %s tagged PCollection<Row> input",
+ FileWriteSchemaTransform.class.getName(), INPUT_TAG));
+ }
+
+ PCollection<Row> rowInput = input.get(INPUT_TAG);
+
+ PTransform<PCollection<Row>, PCollection<String>> transform =
+ getProvider().buildTransform(configuration, rowInput.getSchema());
+
+ PCollection<String> files = rowInput.apply("Write Rows", transform);
+ PCollection<Row> output =
+ files.apply(
+ "Filenames to Rows",
+ MapElements.into(rows())
+ .via(
+ (String name) ->
+ Row.withSchema(OUTPUT_SCHEMA)
+ .withFieldValue(FILE_NAME_FIELD.getName(), name)
+ .build()));
+
+ return PCollectionRowTuple.of(OUTPUT_TAG, output);
+ }
+
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+ return this;
+ }
+
+ /**
+ * A helper method to retrieve the mapped {@link FileWriteSchemaTransformFormatProvider} from a
+ * {@link FileWriteSchemaTransformConfiguration#getFormat()}.
+ */
+ FileWriteSchemaTransformFormatProvider getProvider() {
+ Map<String, FileWriteSchemaTransformFormatProvider> providers =
+ FileWriteSchemaTransformFormatProviders.loadProviders();
+ if (!providers.containsKey(configuration.getFormat())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s is not a supported format. See %s for a list of supported formats.",
+ configuration.getFormat(),
+ FileWriteSchemaTransformFormatProviders.class.getName()));
+ }
+ // resolves [dereference.of.nullable]
+ Optional<FileWriteSchemaTransformFormatProvider> provider =
+ Optional.ofNullable(providers.get(configuration.getFormat()));
+ checkState(provider.isPresent());
+ return provider.get();
+ }
+
+ /**
+ * Validates a {@link FileWriteSchemaTransformConfiguration} for correctness depending on its
+ * {@link FileWriteSchemaTransformConfiguration#getFormat()}.
+ */
+ static void validateConfiguration(FileWriteSchemaTransformConfiguration configuration) {
+ String format = configuration.getFormat();
+ if (configuration.getCsvConfiguration() != null && !format.equals(CSV)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "configuration with %s is not compatible with a %s format",
+ FileWriteSchemaTransformConfiguration.CsvConfiguration.class.getName(), format));
+ }
+ if (configuration.getParquetConfiguration() != null && !format.equals(PARQUET)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "configuration with %s is not compatible with a %s format",
+ FileWriteSchemaTransformConfiguration.ParquetConfiguration.class.getName(),
+ format));
+ }
+ if (configuration.getXmlConfiguration() != null && !format.equals(XML)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "configuration with %s is not compatible with a %s format",
+ FileWriteSchemaTransformConfiguration.XmlConfiguration.class.getName(), format));
+ }
+ if (format.equals(AVRO) && !Strings.isNullOrEmpty(configuration.getCompression())) {
+ throw new IllegalArgumentException(
+ "configuration with compression is not compatible with AvroIO");
+ }
+ if (format.equals(PARQUET) && !Strings.isNullOrEmpty(configuration.getCompression())) {
+ throw new IllegalArgumentException(
+ "configuration with compression is not compatible with ParquetIO");
+ }
+ }
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonWriteSchemaTransformFormatProvider.java
new file mode 100644
index 00000000000..68f12e3d26b
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonWriteSchemaTransformFormatProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.applyCommonTextIOWriteFeatures;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+
+import com.google.auto.service.AutoService;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link FileWriteSchemaTransformFormatProvider} for JSON format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class JsonWriteSchemaTransformFormatProvider
+ implements FileWriteSchemaTransformFormatProvider {
+
+ final String suffix = String.format(".%s", FileWriteSchemaTransformFormatProviders.JSON);
+
+ @Override
+ public String identifier() {
+ return FileWriteSchemaTransformFormatProviders.JSON;
+ }
+
+ /**
+ * Builds a {@link PTransform} that transforms a {@link Row} {@link PCollection} into result
+ * {@link PCollection} file names written using {@link TextIO.Write}.
+ */
+ @Override
+ public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+ FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+ return new PTransform<PCollection<Row>, PCollection<String>>() {
+ @Override
+ public PCollection<String> expand(PCollection<Row> input) {
+
+ PCollection<String> json = input.apply("Row To Json", mapRowsToJsonStrings(schema));
+
+ TextIO.Write write =
+ TextIO.write().to(configuration.getFilenamePrefix()).withSuffix(suffix);
+
+ write = applyCommonTextIOWriteFeatures(write, configuration);
+
+ return json.apply("Write Json", write.withOutputFilenames())
+ .getPerDestinationOutputFilenames()
+ .apply("perDestinationOutputFilenames", Values.create());
+ }
+ };
+ }
+
+ /** Builds a {@link MapElements} transform to map {@link Row} to JSON strings. */
+ MapElements<Row, String> mapRowsToJsonStrings(Schema schema) {
+ return MapElements.into(strings()).via(new RowToJsonFn(schema));
+ }
+
+ private static class RowToJsonFn implements SerializableFunction<Row, String> {
+
+ private final PayloadSerializer payloadSerializer;
+
+ RowToJsonFn(Schema schema) {
+ payloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(schema, ImmutableMap.of());
+ }
+
+ @Override
+ public String apply(Row input) {
+ return new String(payloadSerializer.serialize(input), StandardCharsets.UTF_8);
+ }
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetWriteSchemaTransformFormatProvider.java
new file mode 100644
index 00000000000..6981844d229
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetWriteSchemaTransformFormatProvider.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.applyCommonFileIOWriteFeatures;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.service.AutoService;
+import java.util.Optional;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.ParquetConfiguration;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/** A {@link FileWriteSchemaTransformFormatProvider} for Parquet format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class ParquetWriteSchemaTransformFormatProvider
+ implements FileWriteSchemaTransformFormatProvider {
+
+ private static final String SUFFIX =
+ String.format(".%s", FileWriteSchemaTransformFormatProviders.PARQUET);
+
+ @Override
+ public String identifier() {
+ return FileWriteSchemaTransformFormatProviders.PARQUET;
+ }
+
+ /**
+ * Builds a {@link PTransform} that transforms a {@link Row} {@link PCollection} into result
+ * {@link PCollection} file names written using {@link ParquetIO.Sink} and {@link FileIO.Write}.
+ */
+ @Override
+ public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+ FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+ return new PTransform<PCollection<Row>, PCollection<String>>() {
+ @Override
+ public PCollection<String> expand(PCollection<Row> input) {
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+ AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
+
+ FileIO.Write<Void, GenericRecord> write =
+ FileIO.<GenericRecord>write()
+ .to(configuration.getFilenamePrefix())
+ .via(buildSink(parquetConfiguration(configuration), schema))
+ .withSuffix(SUFFIX);
+
+ write = applyCommonFileIOWriteFeatures(write, configuration);
+
+ return input
+ .apply(
+ "Row To GenericRecord",
+ FileWriteSchemaTransformFormatProviders.mapRowsToGenericRecords(schema))
+ .setCoder(coder)
+ .apply("Write Parquet", write)
+ .getPerDestinationOutputFilenames()
+ .apply("perDestinationOutputFilenames", Values.create());
+ }
+ };
+ }
+
+ private ParquetIO.Sink buildSink(ParquetConfiguration configuration, Schema schema) {
+
+ ParquetIO.Sink sink =
+ ParquetIO.sink(AvroUtils.toAvroSchema(schema))
+ .withCompressionCodec(
+ CompressionCodecName.valueOf(configuration.getCompressionCodecName()));
+
+ if (configuration.getRowGroupSize() != null) {
+ int rowGroupSize = getRowGroupSize(configuration);
+ // Python SDK external transforms do not support null values requiring additional check.
+ if (rowGroupSize > 0) {
+ sink = sink.withRowGroupSize(rowGroupSize);
+ }
+ }
+
+ return sink;
+ }
+
+ private static ParquetConfiguration parquetConfiguration(
+ FileWriteSchemaTransformConfiguration configuration) {
+ // resolves Checker Framework incompatible argument for requireNonNull parameter
+ Optional<ParquetConfiguration> parquetConfiguration =
+ Optional.ofNullable(configuration.getParquetConfiguration());
+ checkState(parquetConfiguration.isPresent());
+ return parquetConfiguration.get();
+ }
+
+ private static Integer getRowGroupSize(ParquetConfiguration configuration) {
+ // resolves Checker Framework [unboxing.of.nullable] unboxing a possibly-null reference
+ Optional<Integer> rowGroupSize = Optional.ofNullable(configuration.getRowGroupSize());
+ checkState(rowGroupSize.isPresent());
+ return rowGroupSize.get();
+ }
+}
diff --git a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapter.java
similarity index 51%
copy from sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
copy to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapter.java
index 6081e783ec2..77457bc8307 100644
--- a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapter.java
@@ -17,27 +17,25 @@
*/
package org.apache.beam.sdk.io.fileschematransform;
-import static org.junit.Assert.assertTrue;
+import java.io.Serializable;
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+/** An {@link XmlAdapter} for {@link DateTime}s. */
+class XmlDateTimeAdapter extends XmlAdapter<String, DateTime> implements Serializable {
+ private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.dateTime();
-/** Tests for {@link FileWriteSchemaTransformFormatProviders}. */
-@RunWith(JUnit4.class)
-public class FileWriteSchemaTransformFormatProvidersTest {
+ /** Converts a String into {@link DateTime} based on {@link ISODateTimeFormat}. */
+ @Override
+ public DateTime unmarshal(String v) throws Exception {
+ return DateTime.parse(v, FORMATTER);
+ }
- @Test
- public void testLoadProviders() {
- Map<String, FileWriteSchemaTransformFormatProvider> formatProviderMap =
- FileWriteSchemaTransformFormatProviders.loadProviders();
- Set<String> keys = formatProviderMap.keySet();
- assertTrue(keys.contains("avro"));
- assertTrue(keys.contains("csv"));
- assertTrue(keys.contains("json"));
- assertTrue(keys.contains("parquet"));
- assertTrue(keys.contains("xml"));
+ /** Converts a {@link DateTime} into String based on {@link ISODateTimeFormat}. */
+ @Override
+ public String marshal(DateTime v) throws Exception {
+ return v.toString(FORMATTER);
}
}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapter.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapter.java
new file mode 100644
index 00000000000..0b5d859dadf
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import java.io.Serializable;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * Wraps a {@link Row} for compatible use with {@link javax.xml.bind.JAXBContext}. {@link
+ * XmlRowAdapter} allows {@link XmlWriteSchemaTransformFormatProvider} to convert {@link Row} to XML
+ * strings with no knowledge of the original Java class. {@link javax.xml.bind.Marshaller} requires
+ * Java classes to be annotated with {@link XmlRootElement}, preventing the {@link
+ * javax.xml.bind.Marshaller#marshal(Object, Writer)} of {@link Row}s directly. {@link
+ * XmlRowAdapter} exposes the String key and Object value pairs of the {@link Row} to the {@link
+ * javax.xml.bind.Marshaller}.
+ */
+@XmlRootElement(name = "row")
+@XmlAccessorType(XmlAccessType.PROPERTY)
+class XmlRowAdapter implements Serializable {
+
+ private final HashMap<String, XmlRowValue> record = new HashMap<>();
+
+ /**
+ * Wrap a {@link Row} to prepare {@link XmlRowAdapter}'s use with {@link
+ * javax.xml.bind.Marshaller}. {@link XmlRowAdapter} stores a {@link HashMap} that this method
+ * fills from {@link Row}'s String key and Object value pairs. This copying of data to the {@link
+ * HashMap} allows {@link javax.xml.bind.Marshaller} to populate the XML elements from {@link
+ * #getData()}.
+ */
+ void wrapRow(Row row) {
+ Schema schema = row.getSchema();
+ for (String key : schema.getFieldNames()) {
+ XmlRowValue value = new XmlRowValue();
+ value.setValue(key, row);
+ record.put(key, value);
+ }
+ }
+
+ /**
+ * Exposes the copied {@link Row} data to the {@link javax.xml.bind.Marshaller} via the {@link
+ * XmlElement} annotation.
+ */
+ @XmlElement
+ HashMap<String, XmlRowValue> getData() {
+ return record;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ XmlRowAdapter that = (XmlRowAdapter) o;
+ return record.equals(that.record);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(record);
+ }
+
+ @Override
+ public String toString() {
+ return "XmlRowAdapter{" + "record=" + record + '}';
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValue.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValue.java
new file mode 100644
index 00000000000..df4392665a4
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValue.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+
+/**
+ * Implements an {@link XmlType} of {@link Row} values for compatible use with {@link
+ * javax.xml.bind.JAXBContext}. {@link XmlRowValue} allows {@link
+ * XmlWriteSchemaTransformFormatProvider} to convert {@link Row} values to XML strings with no
+ * knowledge of the original Java class. {@link #setValue(String, Row)} serves as the algorithm's
+ * entry point.
+ */
+@XmlType
+class XmlRowValue implements Serializable {
+ @Nullable private Object primitiveValue = null;
+
+ @Nullable private ArrayList<XmlRowValue> valueList = null;
+
+ @Nullable private DateTime dateTimeValue = null;
+
+ @Nullable private HashMap<String, XmlRowValue> nestedValue = null;
+
+ /**
+ * A {@link Row}'s value for a primitive type such as {@link FieldType#STRING}, {@link
+ * FieldType#DOUBLE}, etc.
+ */
+ @XmlElement(name = "value")
+ @Nullable
+ Object getPrimitiveValue() {
+ return primitiveValue;
+ }
+
+ /**
+ * A {@link Row}'s value for a {@link FieldType#DATETIME}, converted using {@link
+ * XmlDateTimeAdapter}.
+ */
+ @XmlElement(name = "value")
+ @XmlJavaTypeAdapter(XmlDateTimeAdapter.class)
+ @Nullable
+ DateTime getDateTimeValue() {
+ return dateTimeValue;
+ }
+
+ /** A {@link Row}'s value for a {@link TypeName#ARRAY} or {@link TypeName#ITERABLE} type. */
+ @XmlElement(name = "array")
+ @Nullable
+ ArrayList<XmlRowValue> getValueList() {
+ return valueList;
+ }
+
+ /** A {@link Row}'s value for a nested {@link TypeName#ROW} value. */
+ @XmlElement(name = "row")
+ @Nullable
+ HashMap<String, XmlRowValue> getNestedValue() {
+ return nestedValue;
+ }
+
+ void setPrimitiveValue(Object primitiveValue) {
+ this.primitiveValue = primitiveValue;
+ }
+
+ void setValueList(ArrayList<XmlRowValue> valueList) {
+ this.valueList = valueList;
+ }
+
+ /**
+ * The entry point for parsing a {@link Row} record and its value mapped from the key. Primitive
+ * types populate {@link #setPrimitiveValue(Object)}. {@link FieldType#DATETIME} values populate
+ * {@link #setDateTimeValue(ReadableDateTime)}. {@link TypeName#ARRAY} or {@link
+ * TypeName#ITERABLE} values populate {@link #setArrayValue(String, Field, Row)} and {@link
+ * TypeName#ROW} nested values populate {@link #setNestedValue(Row)}.
+ */
+ void setValue(String key, Row parent) {
+ Schema schema = parent.getSchema();
+ Field field = schema.getField(key);
+ FieldType fieldType = field.getType();
+ TypeName typeName = fieldType.getTypeName();
+ switch (typeName) {
+ case BOOLEAN:
+ case BYTE:
+ case DECIMAL:
+ case DOUBLE:
+ case FLOAT:
+ case INT16:
+ case INT32:
+ case INT64:
+ case STRING:
+ primitiveValue = parent.getValue(key);
+ return;
+ case ARRAY:
+ case ITERABLE:
+ setArrayValue(key, field, parent);
+ return;
+ case DATETIME:
+ setDateTimeValue(key, parent);
+ return;
+ case ROW:
+ Optional<Row> child = Optional.ofNullable(parent.getRow(key));
+ checkState(child.isPresent());
+ setNestedValue(child.get());
+ return;
+ default:
+ throw new IllegalArgumentException(
+ String.format("%s at key %s is not supported", typeName.name(), key));
+ }
+ }
+
+ private void setArrayValue(String key, Field field, Row parent) {
+ Optional<FieldType> collectionFieldType =
+ Optional.ofNullable(field.getType().getCollectionElementType());
+ checkState(collectionFieldType.isPresent());
+ TypeName collectionFieldTypeName = collectionFieldType.get().getTypeName();
+ Optional<Iterable<Object>> iterable = Optional.ofNullable(parent.getIterable(key));
+ checkState(iterable.isPresent());
+ valueList = new ArrayList<>();
+ Optional<ArrayList<XmlRowValue>> safeValueList = Optional.of(valueList);
+
+ switch (collectionFieldTypeName) {
+ case BOOLEAN:
+ case BYTE:
+ case DECIMAL:
+ case DOUBLE:
+ case FLOAT:
+ case INT16:
+ case INT32:
+ case INT64:
+ case STRING:
+ for (Object element : iterable.get()) {
+ XmlRowValue value = new XmlRowValue();
+ value.setPrimitiveValue(element);
+ safeValueList.get().add(value);
+ }
+ return;
+ case DATETIME:
+ for (Object element : iterable.get()) {
+ XmlRowValue value = new XmlRowValue();
+ value.setDateTimeValue(((Instant) element).toDateTime());
+ safeValueList.get().add(value);
+ }
+ return;
+ case ROW:
+ for (Object value : iterable.get()) {
+ Row nestedValue = (Row) value;
+ XmlRowValue element = new XmlRowValue();
+ element.setNestedValue(nestedValue);
+ safeValueList.get().add(element);
+ }
+ return;
+ default:
+ throw new IllegalArgumentException(
+ String.format("%s at key %s is not supported", collectionFieldType, key));
+ }
+ }
+
+ private void setDateTimeValue(String key, Row parent) {
+ Optional<ReadableDateTime> value = Optional.ofNullable(parent.getDateTime(key));
+ checkState(value.isPresent());
+ setDateTimeValue(value.get());
+ }
+
+ private void setDateTimeValue(ReadableDateTime value) {
+ dateTimeValue = Instant.ofEpochMilli(value.getMillis()).toDateTime();
+ }
+
+ private void setNestedValue(Row row) {
+ Schema schema = row.getSchema();
+ nestedValue = new HashMap<>();
+ Optional<HashMap<String, XmlRowValue>> safeNestedValue = Optional.of(nestedValue);
+ for (String key : schema.getFieldNames()) {
+ XmlRowValue child = new XmlRowValue();
+ child.setValue(key, row);
+ safeNestedValue.get().put(key, child);
+ }
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ XmlRowValue that = (XmlRowValue) o;
+
+ Optional<Object> thatPrimitiveValue = Optional.ofNullable(that.getPrimitiveValue());
+ Optional<Object> primitiveValue = Optional.ofNullable(getPrimitiveValue());
+
+ Optional<DateTime> thatPrimitiveDateTime = Optional.ofNullable(that.getDateTimeValue());
+ Optional<DateTime> dateTime = Optional.ofNullable(getDateTimeValue());
+
+ Optional<ArrayList<XmlRowValue>> thatValueList = Optional.ofNullable(that.getValueList());
+ Optional<ArrayList<XmlRowValue>> valueList = Optional.ofNullable(getValueList());
+
+ Optional<HashMap<String, XmlRowValue>> thatNestedValue =
+ Optional.ofNullable(that.getNestedValue());
+ Optional<HashMap<String, XmlRowValue>> nestedValue = Optional.ofNullable(getNestedValue());
+
+ return equals(primitiveValue, thatPrimitiveValue)
+ && equals(dateTime, thatPrimitiveDateTime)
+ && equals(valueList, thatValueList)
+ && equals(nestedValue, thatNestedValue);
+ }
+
+ private static <T> boolean equals(Optional<T> a, Optional<T> b) {
+ if (a.isPresent() && b.isPresent()) {
+ return a.get().equals(b.get());
+ }
+ if (!a.isPresent() && !b.isPresent()) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ // resolves dereference of possibly-null reference
+ Optional<Object> primitive = Optional.ofNullable(getPrimitiveValue());
+ Optional<ArrayList<XmlRowValue>> list = Optional.ofNullable(getValueList());
+ Optional<DateTime> dateTime = Optional.ofNullable(getDateTimeValue());
+ Optional<HashMap<String, XmlRowValue>> nested = Optional.ofNullable(getNestedValue());
+
+ int result = primitive.hashCode();
+ result = 31 * result + list.hashCode();
+ result = 31 * result + dateTime.hashCode();
+ result = 31 * result + nested.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "XmlRowValue{" + "primitiveValue=" + primitiveValue + '}';
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlWriteSchemaTransformFormatProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlWriteSchemaTransformFormatProvider.java
new file mode 100644
index 00000000000..7c8f4ff0c84
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/XmlWriteSchemaTransformFormatProvider.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.applyCommonFileIOWriteFeatures;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.service.AutoService;
+import java.nio.charset.Charset;
+import java.util.Optional;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.XmlConfiguration;
+import org.apache.beam.sdk.io.xml.XmlIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/** A {@link FileWriteSchemaTransformFormatProvider} for XML format. */
+@AutoService(FileWriteSchemaTransformFormatProvider.class)
+public class XmlWriteSchemaTransformFormatProvider
+ implements FileWriteSchemaTransformFormatProvider {
+
+ private static final String SUFFIX = String.format(".%s", XML);
+
+ @Override
+ public String identifier() {
+ return XML;
+ }
+
+ /**
+ * Builds a {@link PTransform} that transforms a {@link Row} {@link PCollection} into result
+ * {@link PCollection} file names written using {@link XmlIO.Sink} and {@link FileIO.Write}.
+ */
+ @Override
+ public PTransform<PCollection<Row>, PCollection<String>> buildTransform(
+ FileWriteSchemaTransformConfiguration configuration, Schema schema) {
+ return new PTransform<PCollection<Row>, PCollection<String>>() {
+ @Override
+ public PCollection<String> expand(PCollection<Row> input) {
+
+ PCollection<XmlRowAdapter> xml =
+ input.apply(
+ "Row to XML",
+ MapElements.into(TypeDescriptor.of(XmlRowAdapter.class)).via(new RowToXmlFn()));
+
+ XmlConfiguration xmlConfig = xmlConfiguration(configuration);
+
+ checkArgument(!Strings.isNullOrEmpty(xmlConfig.getCharset()), "charset must be specified");
+ checkArgument(
+ !Strings.isNullOrEmpty(xmlConfig.getRootElement()), "rootElement must be specified");
+
+ Charset charset = Charset.forName(xmlConfig.getCharset());
+
+ XmlIO.Sink<XmlRowAdapter> sink =
+ XmlIO.sink(XmlRowAdapter.class)
+ .withCharset(charset)
+ .withRootElement(xmlConfig.getRootElement());
+
+ FileIO.Write<Void, XmlRowAdapter> write =
+ FileIO.<XmlRowAdapter>write()
+ .to(configuration.getFilenamePrefix())
+ .via(sink)
+ .withSuffix(SUFFIX);
+
+ write = applyCommonFileIOWriteFeatures(write, configuration);
+
+ return xml.apply("Write XML", write)
+ .getPerDestinationOutputFilenames()
+ .apply("perDestinationOutputFilenames", Values.create());
+ }
+ };
+ }
+
+ /** A {@link SerializableFunction} for converting {@link Row}s to {@link XmlRowAdapter}s. */
+ static class RowToXmlFn implements SerializableFunction<Row, XmlRowAdapter> {
+
+ @Override
+ public XmlRowAdapter apply(Row input) {
+ XmlRowAdapter result = new XmlRowAdapter();
+ result.wrapRow(input);
+ return result;
+ }
+ }
+
+ private static XmlConfiguration xmlConfiguration(
+ FileWriteSchemaTransformConfiguration configuration) {
+ // resolves Checker Framework incompatible argument for parameter of requireNonNull
+ Optional<XmlConfiguration> result = Optional.ofNullable(configuration.getXmlConfiguration());
+ checkState(result.isPresent());
+ return result.get();
+ }
+}
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/package-info.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/package-info.java
similarity index 100%
rename from sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/package-info.java
rename to sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/package-info.java
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroFileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroFileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..466ea475d31
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroFileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.AVRO;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link AvroWriteSchemaTransformFormatProvider}. */
+@RunWith(JUnit4.class)
+public class AvroFileWriteSchemaTransformFormatProviderTest
+ extends FileWriteSchemaTransformFormatProviderTest {
+ @Override
+ protected String getFormat() {
+ return AVRO;
+ }
+
+ @Override
+ protected String getFilenamePrefix() {
+ return "/";
+ }
+
+ @Override
+ protected void assertFolderContainsInAnyOrder(String folder, List<Row> rows, Schema beamSchema) {
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+ AvroGenericCoder coder = AvroGenericCoder.of(avroSchema);
+ List<GenericRecord> expected =
+ rows.stream()
+ .map(AvroUtils.getRowToGenericRecordFunction(avroSchema)::apply)
+ .collect(Collectors.toList());
+
+ PCollection<GenericRecord> actual =
+ readPipeline
+ .apply(AvroIO.readGenericRecords(avroSchema).from(folder + getFilenamePrefix() + "*"))
+ .setCoder(coder);
+
+ PAssert.that(actual).containsInAnyOrder(expected);
+ }
+
+ @Override
+ protected FileWriteSchemaTransformConfiguration buildConfiguration(String folder) {
+ return defaultConfiguration(folder);
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenCompressionSet() {
+ return Optional.of("configuration with compression is not compatible with AvroIO");
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenParquetConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$ParquetConfiguration is not compatible with a avro format");
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenXmlConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$XmlConfiguration is not compatible with a avro format");
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenNumShardsSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenShardNameTemplateSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenCsvConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$CsvConfiguration is not compatible with a avro format");
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..015ce5245fe
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.DOUBLY_NESTED_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SINGLY_NESTED_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.csvConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.parquetConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.xmlConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.loadProviders;
+import static org.apache.beam.sdk.values.TypeDescriptors.booleans;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.AllPrimitiveDataTypes;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Base class for tests of {@link FileWriteSchemaTransformFormatProvider} implementations. */
+abstract class FileWriteSchemaTransformFormatProviderTest {
+
+ /**
+ * The {@link FileWriteSchemaTransformConfiguration#getFormat()} mapped to this {@link
+ * FileWriteSchemaTransformFormatProvider}.
+ */
+ protected abstract String getFormat();
+
+ /**
+ * The filename prefix of sharded files, required by {@link org.apache.beam.sdk.io.TextIO.Write}
+ * based {@link FileWriteSchemaTransformFormatProvider}s.
+ */
+ protected abstract String getFilenamePrefix();
+
+ /**
+ * Asserts whether the {@link FileWriteSchemaTransformFormatProvider} wrote expected contents to
+ * {@link FileWriteSchemaTransformConfiguration#getFilenamePrefix()}.
+ */
+ protected abstract void assertFolderContainsInAnyOrder(
+ String folder, List<Row> rows, Schema beamSchema);
+
+ /**
+ * Builds {@link FileWriteSchemaTransformConfiguration} specifying the folder {@link
+ * FileWriteSchemaTransformConfiguration#getFilenamePrefix()}.
+ */
+ protected abstract FileWriteSchemaTransformConfiguration buildConfiguration(String folder);
+
+ /**
+ * The expected error message when {@link FileWriteSchemaTransformConfiguration#getCompression()}
+ * is not null.
+ */
+ protected abstract Optional<String> expectedErrorWhenCompressionSet();
+
+ @Test
+ public void withCompression() {
+ String to = folder(AllPrimitiveDataTypes.class, "with_compression");
+ Compression compression = Compression.GZIP;
+ FileWriteSchemaTransformConfiguration configuration =
+ buildConfiguration(to).toBuilder().setCompression(compression.name()).build();
+
+ FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+
+ if (expectedErrorWhenCompressionSet().isPresent()) {
+ IllegalArgumentException invalidConfiguration =
+ assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+ assertEquals(expectedErrorWhenCompressionSet().get(), invalidConfiguration.getMessage());
+ return;
+ }
+
+ PCollection<String> files =
+ applyProviderAndAssertFilesWritten(
+ DATA.allPrimitiveDataTypesRows, ALL_PRIMITIVE_DATA_TYPES_SCHEMA, configuration);
+
+ PCollection<String> extension =
+ files
+ .apply(
+ "extract extension",
+ MapElements.into(strings())
+ .via(fullName -> fullName != null ? Files.getFileExtension(fullName) : null))
+ .apply("distinct extensions", Distinct.create());
+
+ PCollection<Boolean> isCompressed =
+ files
+ .apply(
+ "isCompressed",
+ MapElements.into(booleans())
+ .via(filename -> filename != null && compression.isCompressed(filename)))
+ .apply("distinct isCompressed", Distinct.create());
+
+ PAssert.thatSingleton("Filenames end with compression name", extension).isEqualTo("gz");
+
+ PAssert.thatSingleton("Files should be compressed", isCompressed).isEqualTo(true);
+
+ writePipeline.run();
+ }
+
+ /**
+ * The expected error message when {@link
+ * FileWriteSchemaTransformConfiguration#getParquetConfiguration()} ()} is not null.
+ */
+ protected abstract Optional<String> expectedErrorWhenParquetConfigurationSet();
+
+ @Test
+ public void invalidConfigurationWithParquet() {
+ String to = folder(getFormat(), "configuration_with_parquet");
+ FileWriteSchemaTransformConfiguration configuration =
+ buildConfiguration(to)
+ .toBuilder()
+ .setParquetConfiguration(
+ parquetConfigurationBuilder()
+ .setCompressionCodecName(CompressionCodecName.GZIP.name())
+ .build())
+ .build();
+
+ FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+
+ if (!expectedErrorWhenParquetConfigurationSet().isPresent()) {
+ // we do not expect an error
+ provider.from(configuration);
+ return;
+ }
+
+ IllegalArgumentException invalidConfigurationError =
+ assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+
+ assertEquals(
+ expectedErrorWhenParquetConfigurationSet().get(), invalidConfigurationError.getMessage());
+ }
+
+ /**
+ * The expected error message when {@link
+ * FileWriteSchemaTransformConfiguration#getXmlConfiguration()} ()} is not null.
+ */
+ protected abstract Optional<String> expectedErrorWhenXmlConfigurationSet();
+
+ @Test
+ public void invalidConfigurationWithXml() {
+ String to = folder(getFormat(), "configuration_with_xml");
+ FileWriteSchemaTransformConfiguration configuration =
+ buildConfiguration(to)
+ .toBuilder()
+ .setXmlConfiguration(
+ xmlConfigurationBuilder()
+ .setRootElement("rootElement")
+ .setCharset(Charset.defaultCharset().name())
+ .build())
+ .build();
+
+ FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+ if (!expectedErrorWhenXmlConfigurationSet().isPresent()) {
+ // No error expected
+ provider.from(configuration);
+ return;
+ }
+
+ IllegalArgumentException configurationError =
+ assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+
+ assertEquals(expectedErrorWhenXmlConfigurationSet().get(), configurationError.getMessage());
+ }
+
+ /**
+ * The expected error message when {@link FileWriteSchemaTransformConfiguration#getNumShards()}
+ * ()} is not null.
+ */
+ protected abstract Optional<String> expectedErrorWhenNumShardsSet();
+
+ @Test
+ public void numShardsSetConfiguration() {
+ String to = folder(AllPrimitiveDataTypes.class, "num_shards_configuration");
+ int expectedNumShards = 10;
+ FileWriteSchemaTransformConfiguration configuration =
+ buildConfiguration(to).toBuilder().setNumShards(expectedNumShards).build();
+
+ if (expectedErrorWhenNumShardsSet().isPresent()) {
+ FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+ IllegalArgumentException configurationError =
+ assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+ assertEquals(expectedErrorWhenNumShardsSet().get(), configurationError.getMessage());
+ return;
+ }
+
+ List<Row> rows = new ArrayList<>(DATA.allPrimitiveDataTypesRows);
+ for (int i = 0; i < 100; i++) {
+ rows.addAll(DATA.allPrimitiveDataTypesRows);
+ }
+
+ PCollection<String> files =
+ applyProviderAndAssertFilesWritten(rows, ALL_PRIMITIVE_DATA_TYPES_SCHEMA, configuration);
+ PCollection<Long> count = files.apply(Count.globally());
+ PAssert.thatSingleton("Amount of files created should match numShards", count)
+ .isEqualTo(Integer.valueOf(expectedNumShards).longValue());
+
+ writePipeline.run();
+ }
+
+ /**
+ * The expected error message when {@link
+ * FileWriteSchemaTransformConfiguration#getShardNameTemplate()} ()} is not null.
+ */
+ protected abstract Optional<String> expectedErrorWhenShardNameTemplateSet();
+
+ @Test
+ public void shardNameTemplateSetConfiguration() {
+ String to = folder(AllPrimitiveDataTypes.class, "shard_name_template");
+ String shardNameTemplate = "-SS-of-NN";
+ FileWriteSchemaTransformConfiguration configuration =
+ buildConfiguration(to).toBuilder().setShardNameTemplate(shardNameTemplate).build();
+
+ if (expectedErrorWhenShardNameTemplateSet().isPresent()) {
+ FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+ IllegalArgumentException configurationError =
+ assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+ assertEquals(expectedErrorWhenShardNameTemplateSet().get(), configurationError.getMessage());
+ return;
+ }
+
+ PCollection<String> files =
+ applyProviderAndAssertFilesWritten(
+ DATA.allPrimitiveDataTypesRows, ALL_PRIMITIVE_DATA_TYPES_SCHEMA, configuration);
+ PAssert.that("All file names match shard name template", files)
+ .satisfies(
+ (Iterable<String> names) -> {
+ assertNotNull(names);
+ for (String name : names) {
+ assertTrue(name.matches("^.*\\d\\d-of-\\d\\d.*$"));
+ }
+ return null;
+ });
+
+ writePipeline.run();
+ }
+
+ /**
+ * The expected error message when {@link
+ * FileWriteSchemaTransformConfiguration#getCsvConfiguration()} ()} is not null.
+ */
+ protected abstract Optional<String> expectedErrorWhenCsvConfigurationSet();
+
+ @Test
+ public void csvConfigurationSet() {
+ String to = folder(getFormat(), "csv_configuration");
+ FileWriteSchemaTransformProvider provider = new FileWriteSchemaTransformProvider();
+ FileWriteSchemaTransformConfiguration configuration =
+ buildConfiguration(to)
+ .toBuilder()
+ .setCsvConfiguration(csvConfigurationBuilder().build())
+ .build();
+ if (!expectedErrorWhenCsvConfigurationSet().isPresent()) {
+ // No error expected
+ provider.from(configuration);
+ return;
+ }
+ IllegalArgumentException configurationError =
+ assertThrows(IllegalArgumentException.class, () -> provider.from(configuration));
+ assertEquals(expectedErrorWhenCsvConfigurationSet().get(), configurationError.getMessage());
+ }
+
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void allPrimitiveDataTypes() {
+ String to = folder(SchemaAwareJavaBeans.AllPrimitiveDataTypes.class);
+ Schema schema = ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+ List<Row> rows = DATA.allPrimitiveDataTypesRows;
+ applyProviderAndAssertFilesWritten(to, rows, schema);
+ writePipeline.run().waitUntilFinish();
+ assertFolderContainsInAnyOrder(to, rows, schema);
+ readPipeline.run();
+ }
+
+ @Test
+ public void nullableAllPrimitiveDataTypes() {
+ String to = folder(SchemaAwareJavaBeans.NullableAllPrimitiveDataTypes.class);
+ Schema schema = NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+ List<Row> rows = DATA.nullableAllPrimitiveDataTypesRows;
+ applyProviderAndAssertFilesWritten(to, rows, schema);
+ writePipeline.run().waitUntilFinish();
+ assertFolderContainsInAnyOrder(to, rows, schema);
+ readPipeline.run();
+ }
+
+ @Test
+ public void timeContaining() {
+ String to = folder(SchemaAwareJavaBeans.TimeContaining.class);
+ Schema schema = TIME_CONTAINING_SCHEMA;
+ List<Row> rows = DATA.timeContainingRows;
+ applyProviderAndAssertFilesWritten(to, rows, schema);
+ writePipeline.run().waitUntilFinish();
+ assertFolderContainsInAnyOrder(to, rows, schema);
+ readPipeline.run();
+ }
+
+ @Test
+ public void arrayPrimitiveDataTypes() {
+ String to = folder(SchemaAwareJavaBeans.ArrayPrimitiveDataTypes.class);
+ Schema schema = ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA;
+ List<Row> rows = DATA.arrayPrimitiveDataTypesRows;
+ applyProviderAndAssertFilesWritten(to, rows, schema);
+ writePipeline.run().waitUntilFinish();
+ assertFolderContainsInAnyOrder(to, rows, schema);
+ readPipeline.run();
+ }
+
+ @Test
+ public void singlyNestedDataTypesNoRepeat() {
+ String to = folder(SchemaAwareJavaBeans.SinglyNestedDataTypes.class, "no_repeat");
+ Schema schema = SINGLY_NESTED_DATA_TYPES_SCHEMA;
+ List<Row> rows = DATA.singlyNestedDataTypesNoRepeatRows;
+ applyProviderAndAssertFilesWritten(to, rows, schema);
+ writePipeline.run().waitUntilFinish();
+ assertFolderContainsInAnyOrder(to, rows, schema);
+ readPipeline.run();
+ }
+
+ @Test
+ public void singlyNestedDataTypesRepeated() {
+ String to = folder(SchemaAwareJavaBeans.SinglyNestedDataTypes.class, "repeated");
+ Schema schema = SINGLY_NESTED_DATA_TYPES_SCHEMA;
+ List<Row> rows = DATA.singlyNestedDataTypesNoRepeatRows;
+ applyProviderAndAssertFilesWritten(to, rows, schema);
+ writePipeline.run().waitUntilFinish();
+ assertFolderContainsInAnyOrder(to, rows, schema);
+ readPipeline.run();
+ }
+
+ @Test
+ public void doublyNestedDataTypesNoRepeat() {
+ String to = folder(SchemaAwareJavaBeans.DoublyNestedDataTypes.class, "no_repeat");
+ Schema schema = DOUBLY_NESTED_DATA_TYPES_SCHEMA;
+ List<Row> rows = DATA.doublyNestedDataTypesNoRepeatRows;
+ applyProviderAndAssertFilesWritten(to, rows, schema);
+ writePipeline.run().waitUntilFinish();
+ assertFolderContainsInAnyOrder(to, rows, schema);
+ readPipeline.run();
+ }
+
+ @Test
+ public void doublyNestedDataTypesRepeat() {
+ String to = folder(SchemaAwareJavaBeans.DoublyNestedDataTypes.class, "repeated");
+ Schema schema = DOUBLY_NESTED_DATA_TYPES_SCHEMA;
+ List<Row> rows = DATA.doublyNestedDataTypesRepeatRows;
+ applyProviderAndAssertFilesWritten(to, rows, schema);
+ writePipeline.run().waitUntilFinish();
+ assertFolderContainsInAnyOrder(to, rows, schema);
+ readPipeline.run();
+ }
+
+ private FileWriteSchemaTransformFormatProvider getProvider() {
+ return loadProviders().get(getFormat());
+ }
+
+ private <T> String folder(Class<T> clazz, String additionalPath) {
+ return folder(getFormat(), clazz.getSimpleName(), additionalPath);
+ }
+
+ private <T> String folder(Class<T> clazz) {
+ return folder(getFormat(), clazz.getSimpleName());
+ }
+
+ private String folder(String... paths) {
+ try {
+ return tmpFolder.newFolder(paths).getAbsolutePath() + getFilenamePrefix();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private PCollection<String> applyProviderAndAssertFilesWritten(
+ String folder, List<Row> rows, Schema schema) {
+ return applyProviderAndAssertFilesWritten(rows, schema, buildConfiguration(folder));
+ }
+
+ private PCollection<String> applyProviderAndAssertFilesWritten(
+ List<Row> rows, Schema schema, FileWriteSchemaTransformConfiguration configuration) {
+ PCollection<Row> input = writePipeline.apply(Create.of(rows).withRowSchema(schema));
+ PCollection<String> files = input.apply(getProvider().buildTransform(configuration, schema));
+ PCollection<Long> count = files.apply("count number of files", Count.globally());
+ PAssert.thatSingleton("At least one file should be written", count).notEqualTo(0L);
+ return files;
+ }
+
+ protected FileWriteSchemaTransformConfiguration defaultConfiguration(String folder) {
+ return FileWriteSchemaTransformConfiguration.builder()
+ .setFormat(getFormat())
+ .setFilenamePrefix(folder)
+ .build();
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java
new file mode 100644
index 00000000000..82ea6deeb71
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviderTestData.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.arrayPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.doublyNestedDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypes;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.singlyNestedDataTypesToRowFn;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.AllPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ArrayPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.DoublyNestedDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NullableAllPrimitiveDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SinglyNestedDataTypes;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+
+/** Shared {@link SchemaAwareJavaBeans} data to be used across various tests. */
+class FileWriteSchemaTransformFormatProviderTestData {
+ static final FileWriteSchemaTransformFormatProviderTestData DATA =
+ new FileWriteSchemaTransformFormatProviderTestData();
+
+ /* Prevent instantiation outside this class. */
+ private FileWriteSchemaTransformFormatProviderTestData() {}
+
+ final List<AllPrimitiveDataTypes> allPrimitiveDataTypesList =
+ Arrays.asList(
+ allPrimitiveDataTypes(
+ false, (byte) 1, BigDecimal.valueOf(1L), 1.2345, 1.2345f, (short) 1, 1, 1L, "a"),
+ allPrimitiveDataTypes(
+ true, (byte) 2, BigDecimal.valueOf(2L), 2.2345, 2.2345f, (short) 2, 2, 2L, "b"),
+ allPrimitiveDataTypes(
+ false, (byte) 3, BigDecimal.valueOf(3L), 3.2345, 3.2345f, (short) 3, 3, 3L, "c"));
+
+ final List<Row> allPrimitiveDataTypesRows =
+ allPrimitiveDataTypesList.stream()
+ .map(allPrimitiveDataTypesToRowFn()::apply)
+ .collect(Collectors.toList());
+
+ final List<NullableAllPrimitiveDataTypes> nullableAllPrimitiveDataTypesList =
+ Arrays.asList(
+ nullableAllPrimitiveDataTypes(null, null, null, null, null, null),
+ nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, 1, 1L, null),
+ nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, 1, null, "a"),
+ nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, null, 1L, "a"),
+ nullableAllPrimitiveDataTypes(false, 1.2345, null, 1, 1L, "a"),
+ nullableAllPrimitiveDataTypes(false, null, 1.2345f, 1, 1L, "a"),
+ nullableAllPrimitiveDataTypes(null, 1.2345, 1.2345f, 1, 1L, "a"),
+ nullableAllPrimitiveDataTypes(false, 1.2345, 1.2345f, 1, 1L, "a"));
+
+ final List<Row> nullableAllPrimitiveDataTypesRows =
+ nullableAllPrimitiveDataTypesList.stream()
+ .map(nullableAllPrimitiveDataTypesToRowFn()::apply)
+ .collect(Collectors.toList());
+
+ final List<TimeContaining> timeContainingList =
+ Arrays.asList(
+ timeContaining(
+ Instant.ofEpochMilli(1L), Collections.singletonList(Instant.ofEpochMilli(2L))),
+ timeContaining(
+ Instant.ofEpochMilli(Long.MAX_VALUE - 2L),
+ Arrays.asList(Instant.ofEpochMilli(3L), Instant.ofEpochMilli(4L))));
+
+ final List<Row> timeContainingRows =
+ timeContainingList.stream().map(timeContainingToRowFn()::apply).collect(Collectors.toList());
+
+ final List<ArrayPrimitiveDataTypes> arrayPrimitiveDataTypesList =
+ Arrays.asList(
+ arrayPrimitiveDataTypes(
+ Collections.singletonList(false),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ arrayPrimitiveDataTypes(
+ Collections.emptyList(),
+ Collections.singletonList(Double.MAX_VALUE),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ arrayPrimitiveDataTypes(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(Float.MAX_VALUE),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ arrayPrimitiveDataTypes(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(Short.MAX_VALUE),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ arrayPrimitiveDataTypes(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(Integer.MAX_VALUE),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ arrayPrimitiveDataTypes(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(Long.MAX_VALUE),
+ Collections.emptyList()),
+ arrayPrimitiveDataTypes(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(
+ Stream.generate(() -> "🦄").limit(100).collect(Collectors.joining("")))),
+ arrayPrimitiveDataTypes(
+ Arrays.asList(false, true, false),
+ Arrays.asList(Double.MIN_VALUE, 0.0, Double.MAX_VALUE),
+ Arrays.asList(Float.MIN_VALUE, 0.0f, Float.MAX_VALUE),
+ Arrays.asList(Short.MIN_VALUE, (short) 0, Short.MAX_VALUE),
+ Arrays.asList(Integer.MIN_VALUE, 0, Integer.MAX_VALUE),
+ Arrays.asList(Long.MIN_VALUE, 0L, Long.MAX_VALUE),
+ Arrays.asList(
+ Stream.generate(() -> "🐤").limit(100).collect(Collectors.joining("")),
+ Stream.generate(() -> "🐥").limit(100).collect(Collectors.joining("")),
+ Stream.generate(() -> "🐣").limit(100).collect(Collectors.joining("")))),
+ arrayPrimitiveDataTypes(
+ Stream.generate(() -> true).limit(Short.MAX_VALUE).collect(Collectors.toList()),
+ Stream.generate(() -> Double.MIN_VALUE).limit(100).collect(Collectors.toList()),
+ Stream.generate(() -> Float.MIN_VALUE).limit(100).collect(Collectors.toList()),
+ Stream.generate(() -> Short.MIN_VALUE).limit(100).collect(Collectors.toList()),
+ Stream.generate(() -> Integer.MIN_VALUE).limit(100).collect(Collectors.toList()),
+ Stream.generate(() -> Long.MIN_VALUE).limit(100).collect(Collectors.toList()),
+ Stream.generate(() -> "🐿").limit(100).collect(Collectors.toList())),
+ arrayPrimitiveDataTypes(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()));
+
+ final List<Row> arrayPrimitiveDataTypesRows =
+ arrayPrimitiveDataTypesList.stream()
+ .map(arrayPrimitiveDataTypesToRowFn()::apply)
+ .collect(Collectors.toList());
+
+ final List<SinglyNestedDataTypes> singlyNestedDataTypesNoRepeat =
+ allPrimitiveDataTypesList.stream()
+ .map(SchemaAwareJavaBeans::singlyNestedDataTypes)
+ .collect(Collectors.toList());
+
+ final List<Row> singlyNestedDataTypesNoRepeatRows =
+ singlyNestedDataTypesNoRepeat.stream()
+ .map(singlyNestedDataTypesToRowFn()::apply)
+ .collect(Collectors.toList());
+
+ final List<SinglyNestedDataTypes> singlyNestedDataTypesRepeated =
+ allPrimitiveDataTypesList.stream()
+ .map(
+ (AllPrimitiveDataTypes element) ->
+ singlyNestedDataTypes(element, element, element, element))
+ .collect(Collectors.toList());
+
+ final List<Row> singlyNestedDataTypesRepeatedRows =
+ singlyNestedDataTypesRepeated.stream()
+ .map(singlyNestedDataTypesToRowFn()::apply)
+ .collect(Collectors.toList());
+
+ final List<DoublyNestedDataTypes> doublyNestedDataTypesNoRepeat =
+ singlyNestedDataTypesNoRepeat.stream()
+ .map(SchemaAwareJavaBeans::doublyNestedDataTypes)
+ .collect(Collectors.toList());
+
+ final List<Row> doublyNestedDataTypesNoRepeatRows =
+ doublyNestedDataTypesNoRepeat.stream()
+ .map(doublyNestedDataTypesToRowFn()::apply)
+ .collect(Collectors.toList());
+
+ final List<DoublyNestedDataTypes> doublyNestedDataTypesRepeat =
+ singlyNestedDataTypesRepeated.stream()
+ .map((SinglyNestedDataTypes element) -> doublyNestedDataTypes(element, element, element))
+ .collect(Collectors.toList());
+
+ final List<Row> doublyNestedDataTypesRepeatRows =
+ doublyNestedDataTypesRepeat.stream()
+ .map(doublyNestedDataTypesToRowFn()::apply)
+ .collect(Collectors.toList());
+}
diff --git a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
similarity index 64%
copy from sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
copy to sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
index 6081e783ec2..b145243fb14 100644
--- a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
@@ -17,10 +17,16 @@
*/
package org.apache.beam.sdk.io.fileschematransform;
-import static org.junit.Assert.assertTrue;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.AVRO;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.CSV;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.JSON;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.PARQUET;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+import static org.junit.Assert.assertEquals;
import java.util.Map;
import java.util.Set;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -28,16 +34,11 @@ import org.junit.runners.JUnit4;
/** Tests for {@link FileWriteSchemaTransformFormatProviders}. */
@RunWith(JUnit4.class)
public class FileWriteSchemaTransformFormatProvidersTest {
-
@Test
- public void testLoadProviders() {
+ public void loadProviders() {
Map<String, FileWriteSchemaTransformFormatProvider> formatProviderMap =
FileWriteSchemaTransformFormatProviders.loadProviders();
Set<String> keys = formatProviderMap.keySet();
- assertTrue(keys.contains("avro"));
- assertTrue(keys.contains("csv"));
- assertTrue(keys.contains("json"));
- assertTrue(keys.contains("parquet"));
- assertTrue(keys.contains("xml"));
+ assertEquals(ImmutableSet.of(AVRO, CSV, JSON, PARQUET, XML), keys);
}
}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..e1cc231f934
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.AVRO;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.JSON;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.PARQUET;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider.INPUT_TAG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider.FileWriteSchemaTransform;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FileWriteSchemaTransformProvider}. */
+@RunWith(JUnit4.class)
+public class FileWriteSchemaTransformProviderTest {
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+ private static final TypeDescriptor<FileWriteSchemaTransformConfiguration> TYPE_DESCRIPTOR =
+ TypeDescriptor.of(FileWriteSchemaTransformConfiguration.class);
+ private static final SerializableFunction<FileWriteSchemaTransformConfiguration, Row> TO_ROW_FN =
+ AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+ private static final FileWriteSchemaTransformProvider PROVIDER =
+ new FileWriteSchemaTransformProvider();
+
+ @Rule
+ public TestPipeline errorPipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Test
+ public void receivedUnexpectedInputTagsThrowsAnError() {
+ SchemaTransform transform =
+ PROVIDER.from(rowConfiguration(defaultConfiguration().setFormat(JSON).build()));
+ PCollectionRowTuple empty = PCollectionRowTuple.empty(errorPipeline);
+ IllegalArgumentException emptyInputError =
+ assertThrows(IllegalArgumentException.class, () -> empty.apply(transform.buildTransform()));
+
+ assertEquals(
+ "org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider$FileWriteSchemaTransform expects a single input tagged PCollection<Row> input",
+ emptyInputError.getMessage());
+
+ PCollection<Row> rows1 =
+ errorPipeline.apply(
+ Create.of(DATA.allPrimitiveDataTypesRows)
+ .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+ PCollection<Row> rows2 =
+ errorPipeline.apply(
+ Create.of(DATA.timeContainingRows).withRowSchema(TIME_CONTAINING_SCHEMA));
+
+ PCollectionRowTuple tooManyTags =
+ PCollectionRowTuple.of(INPUT_TAG, rows1).and("another", rows2);
+ IllegalArgumentException tooManyTagsError =
+ assertThrows(
+ IllegalArgumentException.class, () -> tooManyTags.apply(transform.buildTransform()));
+
+ assertEquals(
+ "org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider$FileWriteSchemaTransform expects a single input tagged PCollection<Row> input",
+ tooManyTagsError.getMessage());
+
+ // should not throw an error
+ PCollectionRowTuple input = PCollectionRowTuple.of(INPUT_TAG, rows1);
+ input.apply(transform.buildTransform());
+ }
+
+ @Test
+ public void formatMapsToFileWriteSchemaFormatTransform() {
+ Row avro = rowConfiguration(defaultConfiguration().setFormat(AVRO).build());
+ FileWriteSchemaTransformFormatProvider avroFormatProvider =
+ ((FileWriteSchemaTransform) PROVIDER.from(avro)).getProvider();
+ assertTrue(avroFormatProvider instanceof AvroWriteSchemaTransformFormatProvider);
+
+ Row json = rowConfiguration(defaultConfiguration().setFormat(JSON).build());
+ FileWriteSchemaTransformFormatProvider jsonFormatProvider =
+ ((FileWriteSchemaTransform) PROVIDER.from(json)).getProvider();
+ assertTrue(jsonFormatProvider instanceof JsonWriteSchemaTransformFormatProvider);
+
+ Row parquet = rowConfiguration(defaultConfiguration().setFormat(PARQUET).build());
+ FileWriteSchemaTransformFormatProvider parquetFormatProvider =
+ ((FileWriteSchemaTransform) PROVIDER.from(parquet)).getProvider();
+ assertTrue(parquetFormatProvider instanceof ParquetWriteSchemaTransformFormatProvider);
+
+ Row xml = rowConfiguration(defaultConfiguration().setFormat(XML).build());
+ FileWriteSchemaTransformFormatProvider xmlFormatProvider =
+ ((FileWriteSchemaTransform) PROVIDER.from(xml)).getProvider();
+ assertTrue(xmlFormatProvider instanceof XmlWriteSchemaTransformFormatProvider);
+ }
+
+ private static Row rowConfiguration(FileWriteSchemaTransformConfiguration configuration) {
+ return TO_ROW_FN.apply(configuration);
+ }
+
+ private static FileWriteSchemaTransformConfiguration.Builder defaultConfiguration() {
+ return FileWriteSchemaTransformConfiguration.builder()
+ .setFilenamePrefix(FileWriteSchemaTransformProviderTest.class.getSimpleName());
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonFileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonFileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..e380040565b
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonFileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.JSON;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link JsonWriteSchemaTransformFormatProvider}. */
+@RunWith(JUnit4.class)
+public class JsonFileWriteSchemaTransformFormatProviderTest
+ extends FileWriteSchemaTransformFormatProviderTest {
+ @Override
+ protected String getFormat() {
+ return JSON;
+ }
+
+ @Override
+ protected String getFilenamePrefix() {
+ return "/out";
+ }
+
+ @Override
+ protected void assertFolderContainsInAnyOrder(String folder, List<Row> rows, Schema beamSchema) {
+ PCollection<String> actual = readPipeline.apply(TextIO.read().from(folder + "*"));
+
+ PayloadSerializer payloadSerializer =
+ new JsonPayloadSerializerProvider().getSerializer(beamSchema, ImmutableMap.of());
+
+ PAssert.that(actual)
+ .containsInAnyOrder(
+ rows.stream()
+ .map(
+ (Row row) ->
+ new String(payloadSerializer.serialize(row), StandardCharsets.UTF_8))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ protected FileWriteSchemaTransformConfiguration buildConfiguration(String folder) {
+ return defaultConfiguration(folder);
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenCompressionSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenParquetConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$ParquetConfiguration is not compatible with a json format");
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenXmlConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$XmlConfiguration is not compatible with a json format");
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenNumShardsSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenShardNameTemplateSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenCsvConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$CsvConfiguration is not compatible with a json format");
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetFileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetFileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..ea51f235c7f
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetFileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.parquetConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.PARQUET;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ParquetWriteSchemaTransformFormatProvider}. */
+@RunWith(JUnit4.class)
+public class ParquetFileWriteSchemaTransformFormatProviderTest
+ extends FileWriteSchemaTransformFormatProviderTest {
+ @Override
+ protected String getFormat() {
+ return PARQUET;
+ }
+
+ @Override
+ protected String getFilenamePrefix() {
+ return "";
+ }
+
+ @Override
+ protected void assertFolderContainsInAnyOrder(String folder, List<Row> rows, Schema beamSchema) {
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+
+ List<GenericRecord> expected =
+ rows.stream()
+ .map(AvroUtils.getRowToGenericRecordFunction(avroSchema)::apply)
+ .collect(Collectors.toList());
+
+ PCollection<GenericRecord> actual =
+ readPipeline.apply(
+ ParquetIO.read(avroSchema)
+ .from(folder + "/" + getFilenamePrefix() + "*")
+ .withProjection(avroSchema, avroSchema));
+
+ PAssert.that(actual).containsInAnyOrder(expected);
+ }
+
+ @Override
+ protected FileWriteSchemaTransformConfiguration buildConfiguration(String folder) {
+ return FileWriteSchemaTransformConfiguration.builder()
+ .setParquetConfiguration(
+ parquetConfigurationBuilder()
+ .setCompressionCodecName(CompressionCodecName.GZIP.name())
+ .build())
+ .setFormat(getFormat())
+ .setFilenamePrefix(folder + getFilenamePrefix())
+ .build();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenCompressionSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenParquetConfigurationSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenXmlConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$XmlConfiguration is not compatible with a parquet format");
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenNumShardsSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenShardNameTemplateSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenCsvConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$CsvConfiguration is not compatible with a parquet format");
+ }
+}
diff --git a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapterTest.java
similarity index 56%
rename from sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
rename to sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapterTest.java
index 6081e783ec2..2a5584b89bc 100644
--- a/sdks/java/io/fileschematransform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProvidersTest.java
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlDateTimeAdapterTest.java
@@ -17,27 +17,31 @@
*/
package org.apache.beam.sdk.io.fileschematransform;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
-import java.util.Map;
-import java.util.Set;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Tests for {@link FileWriteSchemaTransformFormatProviders}. */
+/** Tests for {@link XmlDateTimeAdapter}. */
@RunWith(JUnit4.class)
-public class FileWriteSchemaTransformFormatProvidersTest {
+public class XmlDateTimeAdapterTest {
@Test
- public void testLoadProviders() {
- Map<String, FileWriteSchemaTransformFormatProvider> formatProviderMap =
- FileWriteSchemaTransformFormatProviders.loadProviders();
- Set<String> keys = formatProviderMap.keySet();
- assertTrue(keys.contains("avro"));
- assertTrue(keys.contains("csv"));
- assertTrue(keys.contains("json"));
- assertTrue(keys.contains("parquet"));
- assertTrue(keys.contains("xml"));
+ public void unmarshal() throws Exception {
+ XmlDateTimeAdapter adapter = new XmlDateTimeAdapter();
+ String dateTimeInput = "2022-12-29T21:04:51.171Z";
+ assertEquals(
+ Instant.ofEpochMilli(1672347891171L).toDateTime(), adapter.unmarshal(dateTimeInput));
+ }
+
+ @Test
+ public void marshal() throws Exception {
+ XmlDateTimeAdapter adapter = new XmlDateTimeAdapter();
+ DateTime dateTime = Instant.ofEpochMilli(1672347891171L).toDateTime(DateTimeZone.UTC);
+ assertEquals("2022-12-29T21:04:51.171Z", adapter.marshal(dateTime));
}
}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlFileWriteSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlFileWriteSchemaTransformFormatProviderTest.java
new file mode 100644
index 00000000000..87d59364702
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlFileWriteSchemaTransformFormatProviderTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviders.XML;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.xml.XmlIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link XmlWriteSchemaTransformFormatProvider}. */
+@RunWith(JUnit4.class)
+public class XmlFileWriteSchemaTransformFormatProviderTest
+ extends FileWriteSchemaTransformFormatProviderTest {
+
+ private static final String ROOT_ELEMENT = "rootElement";
+ private static final String RECORD_ELEMENT = "row";
+
+ @Override
+ protected String getFormat() {
+ return XML;
+ }
+
+ @Override
+ protected String getFilenamePrefix() {
+ return "";
+ }
+
+ @Override
+ protected void assertFolderContainsInAnyOrder(String folder, List<Row> rows, Schema beamSchema) {
+ List<XmlRowAdapter> expected =
+ rows.stream()
+ .map(
+ (Row row) -> {
+ XmlRowAdapter result = new XmlRowAdapter();
+ result.wrapRow(row);
+ return result;
+ })
+ .collect(Collectors.toList());
+
+ PCollection<XmlRowAdapter> actual =
+ readPipeline.apply(
+ XmlIO.<XmlRowAdapter>read()
+ .from(folder + "/*")
+ .withRecordClass(XmlRowAdapter.class)
+ .withRootElement(ROOT_ELEMENT)
+ .withRecordElement(RECORD_ELEMENT)
+ .withCharset(Charset.defaultCharset()));
+
+ PAssert.that(actual).containsInAnyOrder(expected);
+ }
+
+ @Override
+ protected FileWriteSchemaTransformConfiguration buildConfiguration(String folder) {
+ return FileWriteSchemaTransformConfiguration.builder()
+ .setFormat(XML)
+ .setXmlConfiguration(
+ FileWriteSchemaTransformConfiguration.xmlConfigurationBuilder()
+ .setRootElement(ROOT_ELEMENT)
+ .build())
+ .setFilenamePrefix(folder)
+ .setNumShards(1)
+ .build();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenCompressionSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenParquetConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$ParquetConfiguration is not compatible with a xml format");
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenXmlConfigurationSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenNumShardsSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenShardNameTemplateSet() {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional<String> expectedErrorWhenCsvConfigurationSet() {
+ return Optional.of(
+ "configuration with org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration$CsvConfiguration is not compatible with a xml format");
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapterTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapterTest.java
new file mode 100644
index 00000000000..a23476e0adb
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowAdapterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+/** Tests for {@link XmlRowAdapter}. */
+@RunWith(JUnit4.class)
+public class XmlRowAdapterTest {
+
+ @Test
+ public void allPrimitiveDataTypes()
+ throws XPathExpressionException, JAXBException, IOException, SAXException,
+ ParserConfigurationException {
+
+ for (Row row : DATA.allPrimitiveDataTypesRows) {
+ NodeList entries = xmlDocumentEntries(row);
+ assertEquals(ALL_PRIMITIVE_DATA_TYPES_SCHEMA.getFieldNames().size(), entries.getLength());
+ Map<String, Node> actualMap = keyValues("allPrimitiveDataTypes", entries);
+ assertEquals(
+ new HashSet<>(ALL_PRIMITIVE_DATA_TYPES_SCHEMA.getFieldNames()), actualMap.keySet());
+ for (Entry<String, Node> actualKV : actualMap.entrySet()) {
+ String key = actualKV.getKey();
+ Node node = actualKV.getValue();
+ String actual = node.getTextContent();
+ Optional<Object> safeExpected = Optional.ofNullable(row.getValue(key));
+ assertTrue(safeExpected.isPresent());
+ String expected = safeExpected.get().toString();
+ assertEquals(expected, actual);
+ }
+ }
+ }
+
+ @Test
+ public void nullableAllPrimitiveDataTypes()
+ throws XPathExpressionException, JAXBException, IOException, SAXException,
+ ParserConfigurationException {
+ for (Row row : DATA.nullableAllPrimitiveDataTypesRows) {
+ NodeList entries = xmlDocumentEntries(row);
+ assertEquals(
+ NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA.getFieldNames().size(), entries.getLength());
+ Map<String, Node> actualMap = keyValues("nullableAllPrimitiveDataTypes", entries);
+ assertEquals(
+ new HashSet<>(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA.getFieldNames()),
+ actualMap.keySet());
+ for (Entry<String, Node> actualKV : actualMap.entrySet()) {
+ String key = actualKV.getKey();
+ Node node = actualKV.getValue();
+ String actual = node.getTextContent();
+ String expected = "";
+ Optional<Object> safeExpected = Optional.ofNullable(row.getValue(key));
+ if (safeExpected.isPresent()) {
+ expected = safeExpected.get().toString();
+ }
+ assertEquals(expected, actual);
+ }
+ }
+ }
+
+ @Test
+ public void timeContaining()
+ throws XPathExpressionException, JAXBException, IOException, ParserConfigurationException,
+ SAXException {
+ String instant = "instant";
+ DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
+ String instantList = "instantList";
+ for (Row row : DATA.timeContainingRows) {
+ Optional<TimeContaining> safeExpectedTimeContaining =
+ Optional.ofNullable(timeContainingFromRowFn().apply(row));
+ assertTrue(safeExpectedTimeContaining.isPresent());
+ TimeContaining expectedTimeContaining = safeExpectedTimeContaining.get();
+ NodeList entries = xmlDocumentEntries(row);
+ assertEquals(TIME_CONTAINING_SCHEMA.getFieldNames().size(), entries.getLength());
+ Map<String, Node> actualMap = keyValues("timeContaining", entries);
+ assertEquals(new HashSet<>(TIME_CONTAINING_SCHEMA.getFieldNames()), actualMap.keySet());
+ Node actualInstantNode = actualMap.get(instant);
+ String actual = actualInstantNode.getTextContent();
+ String expected = formatter.print(expectedTimeContaining.getInstant().toDateTime());
+ assertEquals(expected, actual);
+
+ List<String> actualInstantList =
+ toStringList(actualMap.get(instantList).getChildNodes()).stream()
+ .sorted()
+ .collect(Collectors.toList());
+ List<String> expectedInstantList =
+ expectedTimeContaining.getInstantList().stream()
+ .map(Instant::toDateTime)
+ .map(formatter::print)
+ .sorted()
+ .collect(Collectors.toList());
+ assertEquals(expectedInstantList, actualInstantList);
+ }
+ }
+
+ private static List<String> toStringList(NodeList nodes) {
+ List<String> result = new ArrayList<>();
+ for (int i = 0; i < nodes.getLength(); i++) {
+ Node node = nodes.item(i);
+ result.add(node.getTextContent());
+ }
+ return result;
+ }
+
+ private static Map<String, Node> keyValues(String testName, NodeList entries) {
+ Map<String, Node> result = new HashMap<>();
+ for (int i = 0; i < entries.getLength(); i++) {
+ NodeList kv = entries.item(i).getChildNodes();
+ assertEquals(testName, 2, kv.getLength());
+ String key = kv.item(0).getTextContent();
+ Node value = kv.item(1);
+ result.put(key, value);
+ }
+ return result;
+ }
+
+ private NodeList xmlDocumentEntries(Row row)
+ throws JAXBException, IOException, SAXException, XPathExpressionException,
+ ParserConfigurationException {
+ JAXBContext context = JAXBContext.newInstance(XmlRowAdapter.class);
+ Marshaller marshaller = context.createMarshaller();
+ DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder documentBuilder = builderFactory.newDocumentBuilder();
+ XPath entryPath = XPathFactory.newInstance().newXPath();
+ XPathExpression entryPathExpression = entryPath.compile("/row/data/entry");
+ XmlRowAdapter adapter = new XmlRowAdapter();
+ adapter.wrapRow(row);
+ StringWriter writer = new StringWriter();
+ marshaller.marshal(adapter, writer);
+ String content = writer.toString();
+ Document xmlDocument = documentBuilder.parse(new InputSource(new StringReader(content)));
+ return (NodeList) entryPathExpression.evaluate(xmlDocument, XPathConstants.NODESET);
+ }
+}
diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValueTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValueTest.java
new file mode 100644
index 00000000000..fbe88359f95
--- /dev/null
+++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/XmlRowValueTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+import org.junit.Test;
+
+/** Tests for {@link XmlRowValue}. */
+public class XmlRowValueTest {
+
+ @Test
+ public void allPrimitiveDataTypes() {
+ String aBoolean = "aBoolean";
+ String aByte = "aByte";
+ String aDecimal = "aDecimal";
+ String aDouble = "aDouble";
+ String aFloat = "aFloat";
+ String aShort = "aShort";
+ String anInteger = "anInteger";
+ String aLong = "aLong";
+ String aString = "aString";
+
+ for (Row row : DATA.allPrimitiveDataTypesRows) {
+ XmlRowValue aBooleanValue = new XmlRowValue();
+ aBooleanValue.setValue(aBoolean, row);
+ assertEquals(aBoolean, row.getValue(aBoolean), aBooleanValue.getPrimitiveValue());
+
+ XmlRowValue aByteValue = new XmlRowValue();
+ aByteValue.setValue(aByte, row);
+ assertEquals(aByte, row.getValue(aByte), aByteValue.getPrimitiveValue());
+
+ XmlRowValue aDecimalValue = new XmlRowValue();
+ aDecimalValue.setValue(aDecimal, row);
+ assertEquals(aDecimal, row.getValue(aDecimal), aDecimalValue.getPrimitiveValue());
+
+ XmlRowValue aDoubleValue = new XmlRowValue();
+ aDoubleValue.setValue(aDouble, row);
+ assertEquals(aDouble, row.getValue(aDouble), aDoubleValue.getPrimitiveValue());
+
+ XmlRowValue aFloatValue = new XmlRowValue();
+ aFloatValue.setValue(aFloat, row);
+ assertEquals(aFloat, row.getValue(aFloat), aFloatValue.getPrimitiveValue());
+
+ XmlRowValue aShortValue = new XmlRowValue();
+ aShortValue.setValue(aShort, row);
+ assertEquals(aShort, row.getValue(aShort), aShortValue.getPrimitiveValue());
+
+ XmlRowValue anIntegerValue = new XmlRowValue();
+ anIntegerValue.setValue(anInteger, row);
+ assertEquals(anInteger, row.getValue(anInteger), anIntegerValue.getPrimitiveValue());
+
+ XmlRowValue aLongValue = new XmlRowValue();
+ aLongValue.setValue(aLong, row);
+ assertEquals(aLong, row.getValue(aLong), aLongValue.getPrimitiveValue());
+
+ XmlRowValue aStringValue = new XmlRowValue();
+ aStringValue.setValue(aString, row);
+ assertEquals(aString, row.getValue(aString), aStringValue.getPrimitiveValue());
+ }
+ }
+
+ @Test
+ public void nullableAllPrimitiveDataTypes() {
+ String aBoolean = "aBoolean";
+ String aDouble = "aDouble";
+ String aFloat = "aFloat";
+ String anInteger = "anInteger";
+ String aLong = "aLong";
+ String aString = "aString";
+ for (Row row : DATA.nullableAllPrimitiveDataTypesRows) {
+ XmlRowValue aBooleanValue = new XmlRowValue();
+ aBooleanValue.setValue(aBoolean, row);
+ assertEquals(aBoolean, row.getValue(aBoolean), aBooleanValue.getPrimitiveValue());
+
+ XmlRowValue aDoubleValue = new XmlRowValue();
+ aDoubleValue.setValue(aDouble, row);
+ assertEquals(aDouble, row.getValue(aDouble), aDoubleValue.getPrimitiveValue());
+
+ XmlRowValue aFloatValue = new XmlRowValue();
+ aFloatValue.setValue(aFloat, row);
+ assertEquals(aFloat, row.getValue(aFloat), aFloatValue.getPrimitiveValue());
+
+ XmlRowValue anIntegerValue = new XmlRowValue();
+ anIntegerValue.setValue(anInteger, row);
+ assertEquals(anInteger, row.getValue(anInteger), anIntegerValue.getPrimitiveValue());
+
+ XmlRowValue aLongValue = new XmlRowValue();
+ aLongValue.setValue(aLong, row);
+ assertEquals(aLong, row.getValue(aLong), aLongValue.getPrimitiveValue());
+
+ XmlRowValue aStringValue = new XmlRowValue();
+ aStringValue.setValue(aString, row);
+ assertEquals(aString, row.getValue(aString), aStringValue.getPrimitiveValue());
+ }
+ }
+
+ @Test
+ public void arrayPrimitiveDataTypes() {
+ String booleans = "booleanList";
+ String doubles = "doubleList";
+ String floats = "floatList";
+ String shorts = "shortList";
+ String integers = "integerList";
+ String longs = "longList";
+ String strings = "stringList";
+
+ for (Row row : DATA.arrayPrimitiveDataTypesRows) {
+ XmlRowValue booleansValue = new XmlRowValue();
+ booleansValue.setValue(booleans, row);
+ Optional<ArrayList<XmlRowValue>> booleansValueList =
+ Optional.ofNullable(booleansValue.getValueList());
+ assertTrue(booleans, booleansValueList.isPresent());
+ assertEquals(
+ booleans,
+ row.getArray(booleans),
+ booleansValueList.get().stream()
+ .map(XmlRowValue::getPrimitiveValue)
+ .collect(Collectors.toList()));
+
+ XmlRowValue doublesValue = new XmlRowValue();
+ doublesValue.setValue(doubles, row);
+ Optional<ArrayList<XmlRowValue>> doublesValueList =
+ Optional.ofNullable(doublesValue.getValueList());
+ assertTrue(doubles, doublesValueList.isPresent());
+ assertEquals(
+ doubles,
+ row.getArray(doubles),
+ doublesValueList.get().stream()
+ .map(XmlRowValue::getPrimitiveValue)
+ .collect(Collectors.toList()));
+
+ XmlRowValue floatsValue = new XmlRowValue();
+ floatsValue.setValue(floats, row);
+ Optional<ArrayList<XmlRowValue>> floatsValueList =
+ Optional.ofNullable(floatsValue.getValueList());
+ assertTrue(floats, floatsValueList.isPresent());
+ assertEquals(
+ floats,
+ row.getArray(floats),
+ floatsValueList.get().stream()
+ .map(XmlRowValue::getPrimitiveValue)
+ .collect(Collectors.toList()));
+
+ XmlRowValue shortsValue = new XmlRowValue();
+ shortsValue.setValue(shorts, row);
+ Optional<ArrayList<XmlRowValue>> shortsValueList =
+ Optional.ofNullable(shortsValue.getValueList());
+ assertTrue(shorts, shortsValueList.isPresent());
+ assertEquals(
+ shorts,
+ row.getArray(shorts),
+ shortsValueList.get().stream()
+ .map(XmlRowValue::getPrimitiveValue)
+ .collect(Collectors.toList()));
+
+ XmlRowValue integersValue = new XmlRowValue();
+ integersValue.setValue(integers, row);
+ Optional<ArrayList<XmlRowValue>> integersValueList =
+ Optional.ofNullable(integersValue.getValueList());
+ assertTrue(integers, integersValueList.isPresent());
+ assertEquals(
+ integers,
+ row.getArray(integers),
+ integersValueList.get().stream()
+ .map(XmlRowValue::getPrimitiveValue)
+ .collect(Collectors.toList()));
+
+ XmlRowValue longsValue = new XmlRowValue();
+ longsValue.setValue(longs, row);
+ Optional<ArrayList<XmlRowValue>> longsValueList =
+ Optional.ofNullable(longsValue.getValueList());
+ assertTrue(longs, longsValueList.isPresent());
+ assertEquals(
+ longs,
+ row.getArray(longs),
+ longsValueList.get().stream()
+ .map(XmlRowValue::getPrimitiveValue)
+ .collect(Collectors.toList()));
+
+ XmlRowValue stringsValue = new XmlRowValue();
+ stringsValue.setValue(strings, row);
+ Optional<ArrayList<XmlRowValue>> stringsValueList =
+ Optional.ofNullable(stringsValue.getValueList());
+ assertTrue(strings, stringsValueList.isPresent());
+ assertEquals(
+ strings,
+ row.getArray(strings),
+ stringsValueList.get().stream()
+ .map(XmlRowValue::getPrimitiveValue)
+ .collect(Collectors.toList()));
+ }
+ }
+
+ @Test
+ public void timeContaining() {
+ String instant = "instant";
+ String instantList = "instantList";
+ for (Row row : DATA.timeContainingRows) {
+ XmlRowValue instantValue = new XmlRowValue();
+ instantValue.setValue(instant, row);
+ Optional<ReadableDateTime> expected = Optional.ofNullable(row.getDateTime(instant));
+ Optional<DateTime> actual = Optional.ofNullable(instantValue.getDateTimeValue());
+ assertTrue(instant, expected.isPresent());
+ assertTrue(instant, actual.isPresent());
+ assertEquals(instant, expected.get().getMillis(), actual.get().getMillis());
+
+ XmlRowValue instantListValue = new XmlRowValue();
+ instantListValue.setValue(instantList, row);
+ Optional<Collection<Instant>> expectedList = Optional.ofNullable(row.getArray(instantList));
+ Optional<List<XmlRowValue>> actualList = Optional.ofNullable(instantListValue.getValueList());
+ assertTrue(instantList, expectedList.isPresent());
+ assertTrue(instantList, actualList.isPresent());
+ assertFalse(instantList, expectedList.get().isEmpty());
+ assertFalse(instantList, actualList.get().isEmpty());
+
+ assertEquals(
+ instantList,
+ expectedList.get().stream().map(Instant::getMillis).collect(Collectors.toList()),
+ dateTimes(actualList.get()).stream()
+ .map(DateTime::getMillis)
+ .collect(Collectors.toList()));
+ }
+ }
+
+ @Test
+ public void singlyNestedDataTypesNoRepeat() {
+ String allPrimitiveDataTypes = "allPrimitiveDataTypes";
+ String allPrimitiveDataTypesList = "allPrimitiveDataTypesList";
+ for (Row row : DATA.singlyNestedDataTypesNoRepeatRows) {
+ XmlRowValue allPrimitiveDataTypesValue = new XmlRowValue();
+ allPrimitiveDataTypesValue.setValue(allPrimitiveDataTypes, row);
+ Optional<Row> expectedAllPrimitiveDataTypes =
+ Optional.ofNullable(row.getRow(allPrimitiveDataTypes));
+ Optional<Map<String, XmlRowValue>> actualAllPrimitiveDataTypes =
+ Optional.ofNullable(allPrimitiveDataTypesValue.getNestedValue());
+ assertTrue(allPrimitiveDataTypes, expectedAllPrimitiveDataTypes.isPresent());
+ assertTrue(allPrimitiveDataTypes, actualAllPrimitiveDataTypes.isPresent());
+ assertEquals(
+ allPrimitiveDataTypes,
+ values(expectedAllPrimitiveDataTypes.get()),
+ values(actualAllPrimitiveDataTypes.get()));
+
+ XmlRowValue allPrimitiveDataTypesValueList = new XmlRowValue();
+ allPrimitiveDataTypesValueList.setValue(allPrimitiveDataTypesList, row);
+ Optional<Collection<Row>> expectedAllPrimitiveDataTypesList =
+ Optional.ofNullable(row.getArray(allPrimitiveDataTypesList));
+ Optional<ArrayList<XmlRowValue>> actualAllPrimitiveDataTypesList =
+ Optional.ofNullable(allPrimitiveDataTypesValueList.getValueList());
+
+ assertTrue(allPrimitiveDataTypesList, expectedAllPrimitiveDataTypesList.isPresent());
+ assertTrue(allPrimitiveDataTypesList, actualAllPrimitiveDataTypesList.isPresent());
+ assertTrue(allPrimitiveDataTypesList, expectedAllPrimitiveDataTypesList.get().isEmpty());
+ assertTrue(allPrimitiveDataTypesList, actualAllPrimitiveDataTypesList.get().isEmpty());
+ }
+ }
+
+ @Test
+ public void singlyNestedDataTypesRepeat() {
+ String allPrimitiveDataTypes = "allPrimitiveDataTypes";
+ String allPrimitiveDataTypesList = "allPrimitiveDataTypesList";
+ for (Row row : DATA.singlyNestedDataTypesRepeatedRows) {
+ XmlRowValue allPrimitiveDataTypesValue = new XmlRowValue();
+ allPrimitiveDataTypesValue.setValue(allPrimitiveDataTypes, row);
+ Optional<Row> expectedAllPrimitiveDataTypes =
+ Optional.ofNullable(row.getRow(allPrimitiveDataTypes));
+ Optional<Map<String, XmlRowValue>> actualAllPrimitiveDataTypes =
+ Optional.ofNullable(allPrimitiveDataTypesValue.getNestedValue());
+ assertTrue(allPrimitiveDataTypes, expectedAllPrimitiveDataTypes.isPresent());
+ assertTrue(allPrimitiveDataTypes, actualAllPrimitiveDataTypes.isPresent());
+ assertEquals(
+ allPrimitiveDataTypes,
+ values(expectedAllPrimitiveDataTypes.get()),
+ values(actualAllPrimitiveDataTypes.get()));
+
+ XmlRowValue allPrimitiveDataTypesValueList = new XmlRowValue();
+ allPrimitiveDataTypesValueList.setValue(allPrimitiveDataTypesList, row);
+ Optional<Collection<Row>> expectedAllPrimitiveDataTypesList =
+ Optional.ofNullable(row.getArray(allPrimitiveDataTypesList));
+ Optional<List<XmlRowValue>> actualAllPrimitiveDataTypesList =
+ Optional.ofNullable(allPrimitiveDataTypesValueList.getValueList());
+
+ assertTrue(allPrimitiveDataTypesList, expectedAllPrimitiveDataTypesList.isPresent());
+ assertTrue(allPrimitiveDataTypesList, actualAllPrimitiveDataTypesList.isPresent());
+ assertFalse(allPrimitiveDataTypesList, expectedAllPrimitiveDataTypesList.get().isEmpty());
+ assertFalse(allPrimitiveDataTypesList, actualAllPrimitiveDataTypesList.get().isEmpty());
+ assertEquals(
+ allPrimitiveDataTypesList,
+ valuesList(expectedAllPrimitiveDataTypesList.get()),
+ valuesList(actualAllPrimitiveDataTypesList.get()));
+ }
+ }
+
+ @Test
+ public void doublyNestedDataTypesNoRepeat() {
+ String singlyNestedDataTypes = "singlyNestedDataTypes";
+ String allPrimitiveDataTypes = "allPrimitiveDataTypes";
+ String allPrimitiveDataTypesList = "allPrimitiveDataTypesList";
+ for (Row row : DATA.doublyNestedDataTypesNoRepeatRows) {
+ XmlRowValue singlyNestedDataTypesValue = new XmlRowValue();
+ singlyNestedDataTypesValue.setValue(singlyNestedDataTypes, row);
+ Optional<Row> expectedSinglyNestedDataTypesValue =
+ Optional.ofNullable(row.getRow(singlyNestedDataTypes));
+ Optional<Map<String, XmlRowValue>> actualSinglyNestedDataTypesValue =
+ Optional.ofNullable(singlyNestedDataTypesValue.getNestedValue());
+ assertTrue(singlyNestedDataTypes, expectedSinglyNestedDataTypesValue.isPresent());
+ assertTrue(singlyNestedDataTypes, actualSinglyNestedDataTypesValue.isPresent());
+ assertNotNull(
+ singlyNestedDataTypes,
+ expectedSinglyNestedDataTypesValue.get().getValue(allPrimitiveDataTypes));
+ assertNotNull(
+ singlyNestedDataTypes, actualSinglyNestedDataTypesValue.get().get(allPrimitiveDataTypes));
+ assertNotNull(
+ singlyNestedDataTypes,
+ expectedSinglyNestedDataTypesValue.get().getValue(allPrimitiveDataTypesList));
+ assertNotNull(
+ singlyNestedDataTypes,
+ actualSinglyNestedDataTypesValue.get().get(allPrimitiveDataTypesList));
+ }
+ }
+
+ private static Map<String, Object> values(Row row) {
+ Map<String, Object> result = new HashMap<>();
+ Schema schema = row.getSchema();
+ for (String key : schema.getFieldNames()) {
+ result.put(key, row.getValue(key));
+ }
+ return result;
+ }
+
+ private static List<Map<String, Object>> valuesList(Collection<Row> values) {
+ return values.stream().map(XmlRowValueTest::values).collect(Collectors.toList());
+ }
+
+ private static Map<String, Object> values(Map<String, XmlRowValue> nested) {
+ Map<String, Object> result = new HashMap<>();
+ for (String key : nested.keySet()) {
+ result.put(key, nested.get(key).getPrimitiveValue());
+ }
+ return result;
+ }
+
+ private static List<Map<String, Object>> valuesList(List<XmlRowValue> nestedList) {
+ List<Map<String, Object>> result = new ArrayList<>();
+ for (XmlRowValue item : nestedList) {
+ Optional<Map<String, XmlRowValue>> nestedValues = Optional.ofNullable(item.getNestedValue());
+ nestedValues.ifPresent(stringXmlRowValueMap -> result.add(values(stringXmlRowValueMap)));
+ }
+ return result;
+ }
+
+ private static List<DateTime> dateTimes(List<XmlRowValue> dateTimeValues) {
+ List<DateTime> result = new ArrayList<>();
+ for (XmlRowValue item : dateTimeValues) {
+ Optional<DateTime> value = Optional.ofNullable(item.getDateTimeValue());
+ value.ifPresent(result::add);
+ }
+ return result;
+ }
+}
diff --git a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java b/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java
deleted file mode 100644
index f8763725e45..00000000000
--- a/sdks/java/io/fileschematransform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformFormatProviders.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.fileschematransform;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.io.Providers;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * {@link FileWriteSchemaTransformFormatProviders} contains {@link
- * FileWriteSchemaTransformFormatProvider} implementations.
- *
- * <p>The design goals of this class are to enable clean {@link
- * FileWriteSchemaTransformConfiguration#getFormat()} lookups that map to the appropriate {@link
- * org.apache.beam.sdk.io.FileIO.Write} that encodes the file data into the configured format.
- */
-@Internal
-public final class FileWriteSchemaTransformFormatProviders {
- private static final String AVRO = "avro";
- private static final String CSV = "csv";
- private static final String JSON = "json";
- private static final String PARQUET = "parquet";
- private static final String XML = "xml";
-
- /** Load all {@link FileWriteSchemaTransformFormatProvider} implementations. */
- public static Map<String, FileWriteSchemaTransformFormatProvider> loadProviders() {
- return Providers.loadProviders(FileWriteSchemaTransformFormatProvider.class);
- }
-
- /** A {@link FileWriteSchemaTransformFormatProvider} for avro format. */
- @AutoService(FileWriteSchemaTransformFormatProvider.class)
- public static class Avro implements FileWriteSchemaTransformFormatProvider {
- @Override
- public String identifier() {
- return AVRO;
- }
-
- @Override
- public PTransform<PCollection<?>, PDone> buildTransform() {
- // TODO(https://github.com/apache/beam/issues/24472)
- throw new UnsupportedOperationException();
- }
- }
-
- /** A {@link FileWriteSchemaTransformFormatProvider} for CSV format. */
- @AutoService(FileWriteSchemaTransformFormatProvider.class)
- public static class Csv implements FileWriteSchemaTransformFormatProvider {
-
- @Override
- public String identifier() {
- return CSV;
- }
-
- @Override
- public PTransform<PCollection<?>, PDone> buildTransform() {
- // TODO(https://github.com/apache/beam/issues/24472)
- throw new UnsupportedOperationException();
- }
- }
-
- /** A {@link FileWriteSchemaTransformFormatProvider} for JSON format. */
- @AutoService(FileWriteSchemaTransformFormatProvider.class)
- public static class Json implements FileWriteSchemaTransformFormatProvider {
- @Override
- public String identifier() {
- return JSON;
- }
-
- @Override
- public PTransform<PCollection<?>, PDone> buildTransform() {
- // TODO(https://github.com/apache/beam/issues/24472)
- throw new UnsupportedOperationException();
- }
- }
-
- /** A {@link FileWriteSchemaTransformFormatProvider} for Parquet format. */
- @AutoService(FileWriteSchemaTransformFormatProvider.class)
- public static class Parquet implements FileWriteSchemaTransformFormatProvider {
- @Override
- public String identifier() {
- return PARQUET;
- }
-
- @Override
- public PTransform<PCollection<?>, PDone> buildTransform() {
- // TODO(https://github.com/apache/beam/issues/24472)
- throw new UnsupportedOperationException();
- }
- }
-
- /** A {@link FileWriteSchemaTransformFormatProvider} for XML format. */
- @AutoService(FileWriteSchemaTransformFormatProvider.class)
- public static class Xml implements FileWriteSchemaTransformFormatProvider {
- @Override
- public String identifier() {
- return XML;
- }
-
- @Override
- public PTransform<PCollection<?>, PDone> buildTransform() {
- // TODO(https://github.com/apache/beam/issues/24472)
- throw new UnsupportedOperationException();
- }
- }
-}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 4455981a3fd..b8879bfa3f3 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -185,7 +185,7 @@ include(":sdks:java:io:file-based-io-tests")
include(":sdks:java:io:bigquery-io-perf-tests")
include(":sdks:java:io:cdap")
include(":sdks:java:io:csv")
-include(":sdks:java:io:fileschematransform")
+include(":sdks:java:io:file-schema-transform")
include(":sdks:java:io:google-cloud-platform")
include(":sdks:java:io:google-cloud-platform:expansion-service")
include(":sdks:java:io:hadoop-common")