You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lukecwik (via GitHub)" <gi...@apache.org> on 2023/01/23 23:15:36 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #24630: [24469] Implement CsvIO.Write and supporting classes

lukecwik commented on code in PR #24630:
URL: https://github.com/apache/beam/pull/24630#discussion_r1084642090


##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>
+ * of the field names.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>To control the order and subset of fields that {@link CsvIO.Write} writes, use {@link
+ * CSVFormat#withHeader(String...)}. Note, however, the following constraints:
+ *
+ * <ol>
+ *   <li>Each header column must match a field name in the {@link Schema}; matching is case
+ *       sensitive.
+ *   <li>Matching header columns must match {@link Schema} fields that are valid {@link FieldType}s;
+ *       see {@link #VALID_FIELD_TYPE_SET}.
+ *   <li>{@link CSVFormat} only allows repeated header columns when {@link
+ *       CSVFormat#withAllowDuplicateHeaderNames()}
+ * </ol>
+ *
+ * <p>The following example shows the use of {@link CSVFormat#withHeader(String...)} to control the
+ * order and subset of <code>Transaction</code> fields.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions ...
+ * transactions.apply(
+ *  CsvIO
+ *    .<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
+ * );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, but only including the subset of fields in their listed order.

Review Comment:
   ```suggestion
    * file, but will only include the subset of fields in their listed order.
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.

Review Comment:
   ```suggestion
    * <p>{@link CsvIO.Write} only supports writing the parts of {@link Schema} aware types that do not contain any
    * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
    * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
    * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.apache.beam.sdk.io.csv.CsvIO.VALID_FIELD_TYPE_SET;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Contains classes and methods to help with converting between {@link Row} and CSV strings. */
+class CsvRowConversions {
+
+  /** Converts between {@link Row} and CSV string using a {@link CSVFormat}. */
+  @AutoValue
+  abstract static class RowToCsv implements SerializableFunction<Row, String> {
+
+    static Builder builder() {
+      return new AutoValue_CsvRowConversions_RowToCsv.Builder();
+    }
+
+    /** The expected {@link Schema} of the {@link Row} input. */
+    abstract Schema getSchema();
+
+    /** The {@link CSVFormat} of the converted {@link Row} input. */
+    abstract CSVFormat getCSVFormat();
+
+    /** Converts a {@link Row} to a CSV string formatted using {@link #getCSVFormat()}. */
+    @Override
+    public String apply(Row input) {
+      Row safeInput = checkNotNull(input);
+      String[] header = getHeader();
+      Object[] values = new Object[header.length];
+      for (int i = 0; i < header.length; i++) {
+        values[i] = safeInput.getValue(header[i]);
+      }
+      return getCSVFormat().format(values);
+    }
+
+    @NonNull
+    String[] getHeader() {
+      return checkNotNull(getCSVFormat().getHeader());
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      /** The expected {@link Schema} of the {@link Row} input. */
+      abstract Builder setSchema(Schema schema);
+
+      abstract Schema getSchema();
+
+      /** The {@link CSVFormat} of the converted {@link Row} input. */
+      abstract Builder setCSVFormat(CSVFormat format);
+
+      abstract CSVFormat getCSVFormat();
+
+      abstract RowToCsv autoBuild();
+
+      final RowToCsv build() {
+        checkArgument(getSchema().getFieldCount() > 0, "Schema has no fields");
+        setCSVFormat(
+            getCSVFormat()
+                .withSkipHeaderRecord()
+                // Delegate to TextIO.Write.withDelimiter instead.
+                .withRecordSeparator((char) 32)

Review Comment:
   Why is this necessary?
   ```suggestion
                   .withRecordSeparator(' ')
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>
+ * of the field names.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>To control the order and subset of fields that {@link CsvIO.Write} writes, use {@link
+ * CSVFormat#withHeader(String...)}. Note, however, the following constraints:
+ *
+ * <ol>
+ *   <li>Each header column must match a field name in the {@link Schema}; matching is case
+ *       sensitive.
+ *   <li>Matching header columns must match {@link Schema} fields that are valid {@link FieldType}s;
+ *       see {@link #VALID_FIELD_TYPE_SET}.
+ *   <li>{@link CSVFormat} only allows repeated header columns when {@link
+ *       CSVFormat#withAllowDuplicateHeaderNames()}
+ * </ol>
+ *
+ * <p>The following example shows the use of {@link CSVFormat#withHeader(String...)} to control the
+ * order and subset of <code>Transaction</code> fields.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions ...
+ * transactions.apply(
+ *  CsvIO
+ *    .<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
+ * );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, but only including the subset of fields in their listed order.
+ *
+ * <pre>{@code
+ * transactionId,purchaseAmount
+ * 12345,10.23
+ * 54321,54.65
+ * 98765,11.76
+ * }</pre>
+ *
+ * <p>In addition to header customization, {@link CsvIO.Write} supports {@link
+ * CSVFormat#withHeaderComments(Object...)} as shown below. Note that {@link
+ * CSVFormat#withCommentMarker(char)} is required when specifying header comments.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions
+ *    .apply(
+ *        CsvIO.<Transaction>write("path/to/folder/prefix",
+ *        CSVFormat.DEFAULT.withCommentMarker('#').withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe")
+ *    );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header and header comments are
+ * repeated for every shard file.
+ *
+ * <pre>{@code
+ * # Bank Report
+ * # 1970-01-01
+ * # Operator: John Doe
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>A {@link PCollection} of {@link Row}s works just like custom Java types illustrated above,
+ * except we use {@link CsvIO#writeRows(String, CSVFormat)} as shown below for the same {@code
+ * Transaction} class. We derive {@code Transaction}'s {@link Schema} using a {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. Note that
+ * hard-coding the {@link Row}s below is for illustration purposes. Developers are instead
+ * encouraged to take advantage of {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction(TypeDescriptor)}.
+ *
+ * <pre>{@code
+ * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
+ * Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
+ * PCollection<Row> transactions = pipeline.apply(Create.of(
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "A")
+ *    .withFieldValue("purchaseAmount", 10.23)
+ *    .withFieldValue("transactionId", "12345")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "B")
+ *    .withFieldValue("purchaseAmount", 54.65)
+ *    .withFieldValue("transactionId", "54321")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "C")
+ *    .withFieldValue("purchaseAmount", 11.76)
+ *    .withFieldValue("transactionId", "98765")
+ *    .build()
+ * );
+ *
+ * transactions.apply(
+ *  CsvIO
+ *    .writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
+ * );
+ * }</pre>
+ *
+ * <p>Writing the transactions {@link PCollection} of {@link Row}s would yield the following CSV
+ * file content.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * {@link CsvIO.Write} does not support the following {@link CSVFormat} properties and will throw an
+ * {@link IllegalArgumentException}.
+ *
+ * <ul>
+ *   <li>{@link CSVFormat#withSkipHeaderRecord()}
+ *   <li>{@link CSVFormat#withAllowMissingColumnNames()} ()}
+ *   <li>{@link CSVFormat#withAutoFlush(boolean)}
+ *   <li>{@link CSVFormat#withIgnoreHeaderCase()}
+ *   <li>{@link CSVFormat#withIgnoreSurroundingSpaces()}
+ * </ul>
+ */
+public class CsvIO {
+  /**
+   * The valid {@link Schema.FieldType} from which {@link CsvIO} converts CSV records to the fields.
+   *
+   * <ul>
+   *   <li>{@link FieldType#BYTE}
+   *   <li>{@link FieldType#BOOLEAN}
+   *   <li>{@link FieldType#DATETIME}
+   *   <li>{@link FieldType#DECIMAL}
+   *   <li>{@link FieldType#DOUBLE}
+   *   <li>{@link FieldType#INT16}
+   *   <li>{@link FieldType#INT32}
+   *   <li>{@link FieldType#INT64}
+   *   <li>{@link FieldType#FLOAT}
+   *   <li>{@link FieldType#STRING}
+   * </ul>
+   */
+  public static final Set<Schema.FieldType> VALID_FIELD_TYPE_SET =
+      ImmutableSet.of(
+          FieldType.BYTE,
+          FieldType.BOOLEAN,
+          FieldType.DATETIME,
+          FieldType.DECIMAL,
+          FieldType.DOUBLE,
+          FieldType.INT16,
+          FieldType.INT32,
+          FieldType.INT64,
+          FieldType.FLOAT,
+          FieldType.STRING);
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in {@link CSVFormat} format. */
+  public static <T> Write<T> write(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<T>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** Instantiates a {@link Write} for writing {@link Row}s in {@link CSVFormat} format. */
+  public static Write<Row> writeRows(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<Row>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** {@link PTransform} for writing CSV files. */
+  @AutoValue
+  public abstract static class Write<T>
+      extends PTransform<PCollection<T>, WriteFilesResult<String>> {
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withCompression(compression)).build();
+    }
+
+    public Write<T> withNoSpilling() {

Review Comment:
   javadoc?



##########
sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/RowToCsvCSVFormatTest.java:
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.apache.beam.sdk.io.csv.CsvIOTestData.DATA;
+import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link org.apache.commons.csv.CSVFormat} settings in the context of {@link
+ * CsvRowConversions.RowToCsv}.
+ */
+@RunWith(JUnit4.class)
+public class RowToCsvCSVFormatTest {
+  @Test
+  public void invalidCSVFormatHeader() {
+    NullPointerException nullHeaderError =
+        assertThrows(
+            NullPointerException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(CSVFormat.DEFAULT)
+                    .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                    .build());
+    assertEquals("CSVFormat withHeader is required", nullHeaderError.getMessage());
+
+    IllegalArgumentException emptyHeaderError =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(CSVFormat.DEFAULT.withHeader())
+                    .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                    .build());
+
+    assertEquals(
+        "CSVFormat withHeader requires at least one column", emptyHeaderError.getMessage());
+
+    IllegalArgumentException mismatchHeaderError =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(
+                        CSVFormat.DEFAULT.withHeader("aString", "idontexist1", "idontexist2"))
+                    .setSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                    .build());
+
+    assertEquals(
+        "columns in CSVFormat header do not exist in Schema: idontexist2,idontexist1",
+        mismatchHeaderError.getMessage());
+  }
+
+  @Test
+  public void invalidSchema() {
+    IllegalArgumentException emptySchemaError =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(CSVFormat.DEFAULT.withHeader())
+                    .setSchema(Schema.of())
+                    .build());
+
+    assertEquals("Schema has no fields", emptySchemaError.getMessage());
+
+    IllegalArgumentException invalidArrayFieldsError =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                CsvRowConversions.RowToCsv.builder()
+                    .setCSVFormat(CSVFormat.DEFAULT.withHeader("instant", "instantList"))
+                    .setSchema(TIME_CONTAINING_SCHEMA)
+                    .build());
+
+    assertEquals(
+        "columns in header match fields in Schema with invalid types: instantList. See CsvIO#VALID_FIELD_TYPE_SET for a list of valid field types.",
+        invalidArrayFieldsError.getMessage());
+
+    // Should not throw Exception when limited to valid fields.
+    CsvRowConversions.RowToCsv.builder()
+        .setCSVFormat(CSVFormat.DEFAULT.withHeader("instant"))
+        .setSchema(TIME_CONTAINING_SCHEMA)
+        .build();
+  }
+
+  @Test
+  public void withAllowDuplicateHeaderNamesDuplicatesRowFieldOutput() {
+    assertEquals(
+        "allowDuplicateHeaderNames=true",
+        "a,a,a",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withAllowDuplicateHeaderNames(true)
+                .withHeader("aString", "aString", "aString")));
+  }
+
+  @Test
+  public void withAllowMissingColumnNamesSettingThrowsException() {
+    IllegalArgumentException exception =
+        assertThrows(
+            "allowMissingColumnNames=true",
+            IllegalArgumentException.class,
+            () ->
+                rowToCsv(
+                    DATA.allPrimitiveDataTypesRow,
+                    csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withAllowMissingColumnNames(true)));
+
+    assertEquals(
+        "withAllowMissingColumnNames is an illegal CSVFormat setting", exception.getMessage());
+  }
+
+  @Test
+  public void withAutoFlushThrowsException() {
+    IllegalArgumentException exception =
+        assertThrows(
+            "autoFlush=true",
+            IllegalArgumentException.class,
+            () ->
+                rowToCsv(
+                    DATA.allPrimitiveDataTypesRow,
+                    csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withAutoFlush(true)));
+
+    assertEquals("withAutoFlush is an illegal CSVFormat setting", exception.getMessage());
+  }
+
+  @Test
+  public void withCommentMarkerDoesNotEffectConversion() {
+    Schema schema = Schema.of(Field.of("aString", FieldType.STRING));
+    Row row = Row.withSchema(schema).attachValues("$abc");
+    assertEquals("$abc", rowToCsv(row, csvFormat(schema).withCommentMarker('$')));
+    assertEquals("$abc", rowToCsv(row, csvFormat(schema).withCommentMarker(null)));
+  }
+
+  @Test
+  public void withDelimiterDrivesCellBorders() {
+    assertEquals(
+        "false~1~10~1.0~1.0~1~1~a~1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withDelimiter('~')));
+  }
+
+  @Test
+  public void withEscapeDrivesOutput() {
+    Schema schema =
+        Schema.of(Field.of("aString", FieldType.STRING), Field.of("anInt", FieldType.INT32));
+    Row row = Row.withSchema(schema).attachValues(",a", 1);
+    String[] header = new String[] {"anInt", "aString"};
+    assertEquals(
+        "1,#,a",
+        rowToCsv(
+            row,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withHeader(header)
+                .withEscape('#')
+                .withQuoteMode(QuoteMode.NONE)));
+    assertEquals(
+        "1,\",a\"", rowToCsv(row, csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withHeader(header)));
+  }
+
+  @Test
+  public void withHeaderDrivesFieldOrderSubsetOutput() {
+    assertEquals(
+        "1,false,a",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withHeader("anInteger", "aBoolean", "aString")));
+  }
+
+  @Test
+  public void withHeaderCommentsDoesNotEffectConversion() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA)
+                .withHeaderComments("some", "header", "comments")));
+  }
+
+  @Test
+  public void withIgnoreEmptyLinesDoesNotEffectOutput() {
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreEmptyLines(true)));
+    assertEquals(
+        "false,1,10,1.0,1.0,1,1,a,1",
+        rowToCsv(
+            DATA.allPrimitiveDataTypesRow,
+            csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreEmptyLines(false)));
+  }
+
+  @Test
+  public void withIgnoreHeaderCaseThrowsException() {
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                rowToCsv(
+                    DATA.allPrimitiveDataTypesRow,
+                    csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreHeaderCase(true)));
+    assertEquals("withIgnoreHeaderCase is an illegal CSVFormat setting", exception.getMessage());
+  }
+
+  @Test
+  public void withIgnoreSurroundingSpacesThrowsException() {
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                rowToCsv(
+                    DATA.allPrimitiveDataTypesRow,
+                    csvFormat(ALL_PRIMITIVE_DATA_TYPES_SCHEMA).withIgnoreSurroundingSpaces(true)));
+    assertEquals(
+        "withIgnoreSurroundingSpaces is an illegal CSVFormat setting", exception.getMessage());
+  }
+
+  @Test
+  public void withNullStringReplacesNullValues() {
+    assertEquals(
+        "🦄,🦄,🦄,🦄,🦄,🦄",

Review Comment:
   nice



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>
+ * of the field names.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>To control the order and subset of fields that {@link CsvIO.Write} writes, use {@link
+ * CSVFormat#withHeader(String...)}. Note, however, the following constraints:
+ *
+ * <ol>
+ *   <li>Each header column must match a field name in the {@link Schema}; matching is case
+ *       sensitive.
+ *   <li>Matching header columns must match {@link Schema} fields that are valid {@link FieldType}s;
+ *       see {@link #VALID_FIELD_TYPE_SET}.
+ *   <li>{@link CSVFormat} only allows repeated header columns when {@link
+ *       CSVFormat#withAllowDuplicateHeaderNames()}
+ * </ol>
+ *
+ * <p>The following example shows the use of {@link CSVFormat#withHeader(String...)} to control the
+ * order and subset of <code>Transaction</code> fields.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions ...
+ * transactions.apply(
+ *  CsvIO
+ *    .<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
+ * );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, but only including the subset of fields in their listed order.
+ *
+ * <pre>{@code
+ * transactionId,purchaseAmount
+ * 12345,10.23
+ * 54321,54.65
+ * 98765,11.76
+ * }</pre>
+ *
+ * <p>In addition to header customization, {@link CsvIO.Write} supports {@link
+ * CSVFormat#withHeaderComments(Object...)} as shown below. Note that {@link
+ * CSVFormat#withCommentMarker(char)} is required when specifying header comments.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions
+ *    .apply(
+ *        CsvIO.<Transaction>write("path/to/folder/prefix",
+ *        CSVFormat.DEFAULT.withCommentMarker('#').withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe")
+ *    );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header and header comments are
+ * repeated for every shard file.
+ *
+ * <pre>{@code
+ * # Bank Report
+ * # 1970-01-01
+ * # Operator: John Doe
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>A {@link PCollection} of {@link Row}s works just like custom Java types illustrated above,
+ * except we use {@link CsvIO#writeRows(String, CSVFormat)} as shown below for the same {@code
+ * Transaction} class. We derive {@code Transaction}'s {@link Schema} using a {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. Note that
+ * hard-coding the {@link Row}s below is for illustration purposes. Developers are instead
+ * encouraged to take advantage of {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction(TypeDescriptor)}.
+ *
+ * <pre>{@code
+ * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
+ * Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
+ * PCollection<Row> transactions = pipeline.apply(Create.of(
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "A")
+ *    .withFieldValue("purchaseAmount", 10.23)
+ *    .withFieldValue("transactionId", "12345")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "B")
+ *    .withFieldValue("purchaseAmount", 54.65)
+ *    .withFieldValue("transactionId", "54321")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "C")
+ *    .withFieldValue("purchaseAmount", 11.76)
+ *    .withFieldValue("transactionId", "98765")
+ *    .build()
+ * );
+ *
+ * transactions.apply(
+ *  CsvIO
+ *    .writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
+ * );
+ * }</pre>
+ *
+ * <p>Writing the transactions {@link PCollection} of {@link Row}s would yield the following CSV
+ * file content.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * {@link CsvIO.Write} does not support the following {@link CSVFormat} properties and will throw an
+ * {@link IllegalArgumentException}.
+ *
+ * <ul>
+ *   <li>{@link CSVFormat#withSkipHeaderRecord()}
+ *   <li>{@link CSVFormat#withAllowMissingColumnNames()} ()}
+ *   <li>{@link CSVFormat#withAutoFlush(boolean)}
+ *   <li>{@link CSVFormat#withIgnoreHeaderCase()}
+ *   <li>{@link CSVFormat#withIgnoreSurroundingSpaces()}

Review Comment:
   ```suggestion
    *   <li>{@link CSVFormat#withSkipHeaderRecord}
    *   <li>{@link CSVFormat#withAllowMissingColumnNames}
    *   <li>{@link CSVFormat#withAutoFlush}
    *   <li>{@link CSVFormat#withIgnoreHeaderCase}
    *   <li>{@link CSVFormat#withIgnoreSurroundingSpaces}
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>
+ * of the field names.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>To control the order and subset of fields that {@link CsvIO.Write} writes, use {@link
+ * CSVFormat#withHeader(String...)}. Note, however, the following constraints:

Review Comment:
   Note that you can drop the parameter types like (String...) in the above. It helps keep the javadoc links working longer and it is only useful if you want to specify an exact version of an overloaded method.



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>
+ * of the field names.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>To control the order and subset of fields that {@link CsvIO.Write} writes, use {@link
+ * CSVFormat#withHeader(String...)}. Note, however, the following constraints:
+ *
+ * <ol>
+ *   <li>Each header column must match a field name in the {@link Schema}; matching is case
+ *       sensitive.
+ *   <li>Matching header columns must match {@link Schema} fields that are valid {@link FieldType}s;
+ *       see {@link #VALID_FIELD_TYPE_SET}.
+ *   <li>{@link CSVFormat} only allows repeated header columns when {@link
+ *       CSVFormat#withAllowDuplicateHeaderNames()}
+ * </ol>
+ *
+ * <p>The following example shows the use of {@link CSVFormat#withHeader(String...)} to control the
+ * order and subset of <code>Transaction</code> fields.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions ...
+ * transactions.apply(
+ *  CsvIO
+ *    .<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
+ * );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, but only including the subset of fields in their listed order.
+ *
+ * <pre>{@code
+ * transactionId,purchaseAmount
+ * 12345,10.23
+ * 54321,54.65
+ * 98765,11.76
+ * }</pre>
+ *
+ * <p>In addition to header customization, {@link CsvIO.Write} supports {@link
+ * CSVFormat#withHeaderComments(Object...)} as shown below. Note that {@link
+ * CSVFormat#withCommentMarker(char)} is required when specifying header comments.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions
+ *    .apply(
+ *        CsvIO.<Transaction>write("path/to/folder/prefix",
+ *        CSVFormat.DEFAULT.withCommentMarker('#').withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe")
+ *    );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header and header comments are
+ * repeated for every shard file.
+ *
+ * <pre>{@code
+ * # Bank Report
+ * # 1970-01-01
+ * # Operator: John Doe
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>A {@link PCollection} of {@link Row}s works just like custom Java types illustrated above,
+ * except we use {@link CsvIO#writeRows(String, CSVFormat)} as shown below for the same {@code
+ * Transaction} class. We derive {@code Transaction}'s {@link Schema} using a {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. Note that
+ * hard-coding the {@link Row}s below is for illustration purposes. Developers are instead
+ * encouraged to take advantage of {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction(TypeDescriptor)}.
+ *
+ * <pre>{@code
+ * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
+ * Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
+ * PCollection<Row> transactions = pipeline.apply(Create.of(
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "A")
+ *    .withFieldValue("purchaseAmount", 10.23)
+ *    .withFieldValue("transactionId", "12345")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "B")
+ *    .withFieldValue("purchaseAmount", 54.65)
+ *    .withFieldValue("transactionId", "54321")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "C")
+ *    .withFieldValue("purchaseAmount", 11.76)
+ *    .withFieldValue("transactionId", "98765")
+ *    .build()
+ * );
+ *
+ * transactions.apply(
+ *  CsvIO
+ *    .writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
+ * );
+ * }</pre>
+ *
+ * <p>Writing the transactions {@link PCollection} of {@link Row}s would yield the following CSV
+ * file content.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * {@link CsvIO.Write} does not support the following {@link CSVFormat} properties and will throw an
+ * {@link IllegalArgumentException}.
+ *
+ * <ul>
+ *   <li>{@link CSVFormat#withSkipHeaderRecord()}
+ *   <li>{@link CSVFormat#withAllowMissingColumnNames()} ()}
+ *   <li>{@link CSVFormat#withAutoFlush(boolean)}
+ *   <li>{@link CSVFormat#withIgnoreHeaderCase()}
+ *   <li>{@link CSVFormat#withIgnoreSurroundingSpaces()}
+ * </ul>
+ */
+public class CsvIO {
+  /**
+   * The valid {@link Schema.FieldType} from which {@link CsvIO} converts CSV records to the fields.
+   *
+   * <ul>
+   *   <li>{@link FieldType#BYTE}
+   *   <li>{@link FieldType#BOOLEAN}
+   *   <li>{@link FieldType#DATETIME}
+   *   <li>{@link FieldType#DECIMAL}
+   *   <li>{@link FieldType#DOUBLE}
+   *   <li>{@link FieldType#INT16}
+   *   <li>{@link FieldType#INT32}
+   *   <li>{@link FieldType#INT64}
+   *   <li>{@link FieldType#FLOAT}
+   *   <li>{@link FieldType#STRING}
+   * </ul>
+   */
+  public static final Set<Schema.FieldType> VALID_FIELD_TYPE_SET =
+      ImmutableSet.of(
+          FieldType.BYTE,
+          FieldType.BOOLEAN,
+          FieldType.DATETIME,
+          FieldType.DECIMAL,
+          FieldType.DOUBLE,
+          FieldType.INT16,
+          FieldType.INT32,
+          FieldType.INT64,
+          FieldType.FLOAT,
+          FieldType.STRING);
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in {@link CSVFormat} format. */
+  public static <T> Write<T> write(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<T>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** Instantiates a {@link Write} for writing {@link Row}s in {@link CSVFormat} format. */
+  public static Write<Row> writeRows(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<Row>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** {@link PTransform} for writing CSV files. */
+  @AutoValue
+  public abstract static class Write<T>
+      extends PTransform<PCollection<T>, WriteFilesResult<String>> {
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withCompression(compression)).build();
+    }
+
+    public Write<T> withNoSpilling() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * TextIO.Write#withNumShards(int)}.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withNumShards(numShards)).build();
+    }
+
+    /**
+     * Forces a single file as output and empty shard name template. See {@link
+     * TextIO.Write#withoutSharding()}.
+     */
+    public Write<T> withoutSharding() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withoutSharding()).build();
+    }
+
+    /**
+     * Uses the given {@link ShardNameTemplate} for naming output files. See {@link
+     * TextIO.Write#withShardNameTemplate(String)}.
+     */
+    public Write<T> withShardTemplate(String shardTemplate) {
+      return toBuilder()
+          .setTextIOWrite(getTextIOWrite().withShardNameTemplate(shardTemplate))
+          .build();
+    }
+
+    /**
+     * Configures the filename suffix for written files. See {@link
+     * TextIO.Write#withSuffix(String)}.
+     */
+    public Write<T> withSuffix(String suffix) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withSuffix(suffix)).build();
+    }
+
+    /**
+     * Set the base directory used to generate temporary files. See {@link
+     * TextIO.Write#withTempDirectory(ResourceId)}.
+     */
+    public Write<T> withTempDirectory(ResourceId tempDirectory) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withTempDirectory(tempDirectory)).build();
+    }
+
+    /**
+     * Preserves windowing of input elements and writes them to files based on the element's window.
+     * See {@link TextIO.Write#withWindowedWrites()}.
+     */
+    public Write<T> withWindowedWrites() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withWindowedWrites()).build();
+    }
+
+    /**
+     * Returns a transform for writing to text files like this one but that has the given {@link
+     * FileBasedSink.WritableByteChannelFactory} to be used by the {@link FileBasedSink} during
+     * output. See {@link
+     * TextIO.Write#withWritableByteChannelFactory(FileBasedSink.WritableByteChannelFactory)}.
+     */
+    public Write<T> withWritableByteChannelFactory(
+        FileBasedSink.WritableByteChannelFactory writableByteChannelFactory) {
+      return toBuilder()
+          .setTextIOWrite(
+              getTextIOWrite().withWritableByteChannelFactory(writableByteChannelFactory))
+          .build();
+    }
+
+    /** The underlying {@link FileIO.Write} that writes converted input to CSV formatted output. */
+    abstract TextIO.Write getTextIOWrite();
+
+    /** The {@link CSVFormat} to inform headers, header comments, and format CSV row content. */
+    abstract CSVFormat getCSVFormat();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * The underlying {@link FileIO.Write} that writes converted input to CSV formatted output.
+       */
+      abstract Builder<T> setTextIOWrite(TextIO.Write value);
+
+      /** The {@link CSVFormat} to convert input. Defaults to {@link CSVFormat#DEFAULT}. */
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract CSVFormat getCSVFormat();
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+        checkArgument(
+            !getCSVFormat().getSkipHeaderRecord(),
+            "withSkipHeaderRecord is an illegal CSVFormat setting");
+
+        if (getCSVFormat().getHeaderComments() != null) {
+          checkArgument(
+              getCSVFormat().isCommentMarkerSet(),
+              "CSVFormat withCommentMarker required when withHeaderComments");
+        }
+        return autoBuild();
+      }
+    }
+
+    @Override
+    public WriteFilesResult<String> expand(PCollection<T> input) {
+      if (!input.hasSchema()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s requires an input Schema. Note that only Row or user classes are supported. Consider using TextIO or FileIO directly when writing primitive types",
+                Write.class.getName()));
+      }
+
+      Schema schema = input.getSchema();
+
+      RowCoder rowCoder = RowCoder.of(schema);
+
+      PCollection<Row> rows =
+          input
+              .apply("To Rows", MapElements.into(rows()).via(input.getToRowFunction()))
+              .setCoder(rowCoder);
+
+      CSVFormat csvFormat = applyRequiredCSVFormatSettings(schema);
+
+      String header = formatHeader(csvFormat);
+
+      SerializableFunction<Row, String> toCsvFn =
+          CsvRowConversions.RowToCsv.builder()
+              .setCSVFormat(csvFormat)
+              .setSchema(input.getSchema())
+              .build();
+
+      PCollection<String> csv = rows.apply("To CSV", MapElements.into(strings()).via(toCsvFn));
+
+      return csv.apply("Write CSV", getTextIOWrite().withHeader(header).withOutputFilenames());
+    }
+
+    CSVFormat applyRequiredCSVFormatSettings(Schema schema) {
+      CSVFormat csvFormat = getCSVFormat().withSkipHeaderRecord();
+      if (csvFormat.getHeader() == null) {
+        csvFormat = csvFormat.withHeader(schema.sorted().getFieldNames().toArray(new String[0]));
+      }
+      return csvFormat;
+    }
+
+    private static String formatHeader(CSVFormat csvFormat) {
+      String[] header = requireNonNull(csvFormat.getHeader());
+      List<String> result = new ArrayList<>();
+      if (csvFormat.getHeaderComments() != null) {
+        for (String comment : csvFormat.getHeaderComments()) {
+          result.add(csvFormat.getCommentMarker() + " " + comment);
+        }
+      }
+      CSVFormat withoutHeaderComments = csvFormat.withHeaderComments();
+      result.add(withoutHeaderComments.format((Object[]) header));
+      return String.join("\n", result);
+    }

Review Comment:
   We are ignoring the case where the user wants to skip the header record and doesn't specify the header. We should only specify the `csvFormat` header if `withSkipHeaderRecord` is `false`.
   
   Note that we must also return `null` from formatHeader in this case since `TextIO#withHeader` specifically says that a newline is automatically added after.
   
   Add a test case for this scenario.
   
   ```suggestion
       CSVFormat applyRequiredCSVFormatSettings(Schema schema) {
         CSVFormat csvFormat = getCSVFormat();
         if (!csvFormat.getSkipHeaderRecord() && csvFormat.getHeader() == null) {
           return csvFormat.withSkipHeaderRecord().withHeader(schema.sorted().getFieldNames().toArray(new String[0]));
         }
         return csvFormat;
       }
   
       private static @Nullable String formatHeader(CSVFormat csvFormat) {
         String[] header = requireNonNull(csvFormat.getHeader());
         if (header == null) {
           return null;
         }
         List<String> result = new ArrayList<>();
         if (csvFormat.getHeaderComments() != null) {
           for (String comment : csvFormat.getHeaderComments()) {
             result.add(csvFormat.getCommentMarker() + " " + comment);
           }
         }
         CSVFormat withoutHeaderComments = csvFormat.withHeaderComments();
         result.add(withoutHeaderComments.format((Object[]) header));
         return String.join("\n", result);
       }
   ```



##########
sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOWriteTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.io.csv.CsvIOTestData.DATA;
+import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.AllPrimitiveDataTypes;
+import static org.apache.beam.sdk.io.csv.CsvIOTestJavaBeans.allPrimitiveDataTypes;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.apache.commons.csv.CSVFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CsvIO.Write}. */
+@RunWith(JUnit4.class)
+public class CsvIOWriteTest {
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Rule
+  public TestPipeline errorPipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Test
+  public void headersWithCommentsWrittenFirstOnEachShard() {
+    File folder =
+        createFolder(
+            AllPrimitiveDataTypes.class.getSimpleName(),
+            "headersWithCommentsWrittenFirstOnEachShard");
+
+    PCollection<Row> input =
+        writePipeline.apply(
+            Create.of(DATA.allPrimitiveDataTypeRows)
+                .withRowSchema(ALL_PRIMITIVE_DATA_TYPES_SCHEMA));
+    String expectedHeader = "aBoolean,aByte,aDecimal,aDouble,aFloat,aLong,aShort,aString,anInteger";
+    CSVFormat csvFormat =
+        CSVFormat.DEFAULT.withHeaderComments("foo", "bar", "baz").withCommentMarker('#');
+
+    input.apply(CsvIO.writeRows(toFilenamePrefix(folder), csvFormat).withNumShards(3));
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<FileIO.ReadableFile> files =
+        readPipeline
+            .apply(FileIO.match().filepattern(toFilenamePrefix(folder) + "*"))
+            .apply(FileIO.readMatches());
+    PAssert.that(files)
+        .satisfies(
+            (Iterable<FileIO.ReadableFile> itr) -> {
+              Iterable<FileIO.ReadableFile> safeItr = requireNonNull(itr);
+              for (FileIO.ReadableFile file : safeItr) {
+                try {
+                  List<String> lines = Splitter.on('\n').splitToList(file.readFullyAsUTF8String());
+                  assertFalse(lines.isEmpty());
+                  assertEquals("# foo", lines.get(0));
+                  assertEquals("# bar", lines.get(1));
+                  assertEquals("# baz", lines.get(2));
+                  assertEquals(expectedHeader, lines.get(3));
+
+                  assertTrue(
+                      lines.subList(4, lines.size()).stream().noneMatch(expectedHeader::equals));
+

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>
+ * of the field names.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>To control the order and subset of fields that {@link CsvIO.Write} writes, use {@link
+ * CSVFormat#withHeader(String...)}. Note, however, the following constraints:
+ *
+ * <ol>
+ *   <li>Each header column must match a field name in the {@link Schema}; matching is case
+ *       sensitive.
+ *   <li>Matching header columns must match {@link Schema} fields that are valid {@link FieldType}s;
+ *       see {@link #VALID_FIELD_TYPE_SET}.
+ *   <li>{@link CSVFormat} only allows repeated header columns when {@link
+ *       CSVFormat#withAllowDuplicateHeaderNames()}
+ * </ol>
+ *
+ * <p>The following example shows the use of {@link CSVFormat#withHeader(String...)} to control the
+ * order and subset of <code>Transaction</code> fields.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions ...
+ * transactions.apply(
+ *  CsvIO
+ *    .<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
+ * );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, but only including the subset of fields in their listed order.
+ *
+ * <pre>{@code
+ * transactionId,purchaseAmount
+ * 12345,10.23
+ * 54321,54.65
+ * 98765,11.76
+ * }</pre>
+ *
+ * <p>In addition to header customization, {@link CsvIO.Write} supports {@link
+ * CSVFormat#withHeaderComments(Object...)} as shown below. Note that {@link
+ * CSVFormat#withCommentMarker(char)} is required when specifying header comments.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions
+ *    .apply(
+ *        CsvIO.<Transaction>write("path/to/folder/prefix",
+ *        CSVFormat.DEFAULT.withCommentMarker('#').withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe")
+ *    );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header and header comments are
+ * repeated for every shard file.
+ *
+ * <pre>{@code
+ * # Bank Report
+ * # 1970-01-01
+ * # Operator: John Doe
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>A {@link PCollection} of {@link Row}s works just like custom Java types illustrated above,
+ * except we use {@link CsvIO#writeRows(String, CSVFormat)} as shown below for the same {@code
+ * Transaction} class. We derive {@code Transaction}'s {@link Schema} using a {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. Note that
+ * hard-coding the {@link Row}s below is for illustration purposes. Developers are instead
+ * encouraged to take advantage of {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction(TypeDescriptor)}.
+ *
+ * <pre>{@code
+ * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
+ * Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
+ * PCollection<Row> transactions = pipeline.apply(Create.of(
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "A")
+ *    .withFieldValue("purchaseAmount", 10.23)
+ *    .withFieldValue("transactionId", "12345")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "B")
+ *    .withFieldValue("purchaseAmount", 54.65)
+ *    .withFieldValue("transactionId", "54321")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "C")
+ *    .withFieldValue("purchaseAmount", 11.76)
+ *    .withFieldValue("transactionId", "98765")
+ *    .build()
+ * );
+ *
+ * transactions.apply(
+ *  CsvIO
+ *    .writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
+ * );
+ * }</pre>
+ *
+ * <p>Writing the transactions {@link PCollection} of {@link Row}s would yield the following CSV
+ * file content.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * {@link CsvIO.Write} does not support the following {@link CSVFormat} properties and will throw an
+ * {@link IllegalArgumentException}.
+ *
+ * <ul>
+ *   <li>{@link CSVFormat#withSkipHeaderRecord()}
+ *   <li>{@link CSVFormat#withAllowMissingColumnNames()} ()}
+ *   <li>{@link CSVFormat#withAutoFlush(boolean)}
+ *   <li>{@link CSVFormat#withIgnoreHeaderCase()}
+ *   <li>{@link CSVFormat#withIgnoreSurroundingSpaces()}
+ * </ul>
+ */
+public class CsvIO {
+  /**
+   * The valid {@link Schema.FieldType} from which {@link CsvIO} converts CSV records to the fields.
+   *
+   * <ul>
+   *   <li>{@link FieldType#BYTE}
+   *   <li>{@link FieldType#BOOLEAN}
+   *   <li>{@link FieldType#DATETIME}
+   *   <li>{@link FieldType#DECIMAL}
+   *   <li>{@link FieldType#DOUBLE}
+   *   <li>{@link FieldType#INT16}
+   *   <li>{@link FieldType#INT32}
+   *   <li>{@link FieldType#INT64}
+   *   <li>{@link FieldType#FLOAT}
+   *   <li>{@link FieldType#STRING}
+   * </ul>
+   */
+  public static final Set<Schema.FieldType> VALID_FIELD_TYPE_SET =
+      ImmutableSet.of(
+          FieldType.BYTE,
+          FieldType.BOOLEAN,
+          FieldType.DATETIME,
+          FieldType.DECIMAL,
+          FieldType.DOUBLE,
+          FieldType.INT16,
+          FieldType.INT32,
+          FieldType.INT64,
+          FieldType.FLOAT,
+          FieldType.STRING);
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in {@link CSVFormat} format. */
+  public static <T> Write<T> write(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<T>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** Instantiates a {@link Write} for writing {@link Row}s in {@link CSVFormat} format. */
+  public static Write<Row> writeRows(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<Row>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** {@link PTransform} for writing CSV files. */
+  @AutoValue
+  public abstract static class Write<T>
+      extends PTransform<PCollection<T>, WriteFilesResult<String>> {
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withCompression(compression)).build();
+    }
+
+    public Write<T> withNoSpilling() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * TextIO.Write#withNumShards(int)}.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withNumShards(numShards)).build();
+    }
+
+    /**
+     * Forces a single file as output and empty shard name template. See {@link
+     * TextIO.Write#withoutSharding()}.
+     */
+    public Write<T> withoutSharding() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withoutSharding()).build();
+    }
+
+    /**
+     * Uses the given {@link ShardNameTemplate} for naming output files. See {@link
+     * TextIO.Write#withShardNameTemplate(String)}.
+     */
+    public Write<T> withShardTemplate(String shardTemplate) {
+      return toBuilder()
+          .setTextIOWrite(getTextIOWrite().withShardNameTemplate(shardTemplate))
+          .build();
+    }
+
+    /**
+     * Configures the filename suffix for written files. See {@link
+     * TextIO.Write#withSuffix(String)}.
+     */
+    public Write<T> withSuffix(String suffix) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withSuffix(suffix)).build();
+    }
+
+    /**
+     * Set the base directory used to generate temporary files. See {@link
+     * TextIO.Write#withTempDirectory(ResourceId)}.
+     */
+    public Write<T> withTempDirectory(ResourceId tempDirectory) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withTempDirectory(tempDirectory)).build();
+    }
+
+    /**
+     * Preserves windowing of input elements and writes them to files based on the element's window.
+     * See {@link TextIO.Write#withWindowedWrites()}.
+     */
+    public Write<T> withWindowedWrites() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withWindowedWrites()).build();
+    }
+
+    /**
+     * Returns a transform for writing to text files like this one but that has the given {@link
+     * FileBasedSink.WritableByteChannelFactory} to be used by the {@link FileBasedSink} during
+     * output. See {@link
+     * TextIO.Write#withWritableByteChannelFactory(FileBasedSink.WritableByteChannelFactory)}.
+     */
+    public Write<T> withWritableByteChannelFactory(
+        FileBasedSink.WritableByteChannelFactory writableByteChannelFactory) {
+      return toBuilder()
+          .setTextIOWrite(
+              getTextIOWrite().withWritableByteChannelFactory(writableByteChannelFactory))
+          .build();
+    }
+
+    /** The underlying {@link FileIO.Write} that writes converted input to CSV formatted output. */
+    abstract TextIO.Write getTextIOWrite();
+
+    /** The {@link CSVFormat} to inform headers, header comments, and format CSV row content. */
+    abstract CSVFormat getCSVFormat();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * The underlying {@link FileIO.Write} that writes converted input to CSV formatted output.
+       */
+      abstract Builder<T> setTextIOWrite(TextIO.Write value);
+
+      /** The {@link CSVFormat} to convert input. Defaults to {@link CSVFormat#DEFAULT}. */
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract CSVFormat getCSVFormat();
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+        checkArgument(
+            !getCSVFormat().getSkipHeaderRecord(),

Review Comment:
   You specify a few other properties in the javadoc for things that aren't supported, should we check for them here as well OR it seems as though CSVFormat has a mix of properties used during reading, writing or both and instead of checking here we could instead just list properties that are ignored during writing in the javadoc (this way a person can use the same CSVFormat object when reading and writing)?



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.apache.beam.sdk.io.csv.CsvIO.VALID_FIELD_TYPE_SET;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Contains classes and methods to help with converting between {@link Row} and CSV strings. */
+class CsvRowConversions {
+
+  /** Converts between {@link Row} and CSV string using a {@link CSVFormat}. */
+  @AutoValue
+  abstract static class RowToCsv implements SerializableFunction<Row, String> {
+
+    static Builder builder() {
+      return new AutoValue_CsvRowConversions_RowToCsv.Builder();
+    }
+
+    /** The expected {@link Schema} of the {@link Row} input. */
+    abstract Schema getSchema();
+
+    /** The {@link CSVFormat} of the converted {@link Row} input. */
+    abstract CSVFormat getCSVFormat();
+
+    /** Converts a {@link Row} to a CSV string formatted using {@link #getCSVFormat()}. */
+    @Override
+    public String apply(Row input) {
+      Row safeInput = checkNotNull(input);
+      String[] header = getHeader();
+      Object[] values = new Object[header.length];
+      for (int i = 0; i < header.length; i++) {
+        values[i] = safeInput.getValue(header[i]);
+      }
+      return getCSVFormat().format(values);
+    }
+
+    @NonNull
+    String[] getHeader() {
+      return checkNotNull(getCSVFormat().getHeader());
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      /** The expected {@link Schema} of the {@link Row} input. */
+      abstract Builder setSchema(Schema schema);
+
+      abstract Schema getSchema();
+
+      /** The {@link CSVFormat} of the converted {@link Row} input. */
+      abstract Builder setCSVFormat(CSVFormat format);
+
+      abstract CSVFormat getCSVFormat();
+
+      abstract RowToCsv autoBuild();
+
+      final RowToCsv build() {
+        checkArgument(getSchema().getFieldCount() > 0, "Schema has no fields");
+        setCSVFormat(
+            getCSVFormat()
+                .withSkipHeaderRecord()
+                // Delegate to TextIO.Write.withDelimiter instead.
+                .withRecordSeparator((char) 32)
+                .withHeaderComments());
+        validateCSVFormat(getCSVFormat());
+        validateHeaderAgainstSchema(getCSVFormat().getHeader(), getSchema());
+
+        return autoBuild();
+      }
+    }
+  }
+
+  private static void validateCSVFormat(CSVFormat csvFormat) {
+    String[] header = checkNotNull(csvFormat.getHeader(), "CSVFormat withHeader is required");
+
+    checkArgument(header.length > 0, "CSVFormat withHeader requires at least one column");
+
+    checkArgument(!csvFormat.getAutoFlush(), "withAutoFlush is an illegal CSVFormat setting");
+
+    checkArgument(
+        !csvFormat.getIgnoreHeaderCase(), "withIgnoreHeaderCase is an illegal CSVFormat setting");
+
+    checkArgument(
+        !csvFormat.getAllowMissingColumnNames(),
+        "withAllowMissingColumnNames is an illegal CSVFormat setting");
+
+    checkArgument(
+        !csvFormat.getIgnoreSurroundingSpaces(),
+        "withIgnoreSurroundingSpaces is an illegal CSVFormat setting");
+  }
+
+  private static void validateHeaderAgainstSchema(String[] csvHeader, Schema schema) {
+    Set<String> distinctColumns = Stream.of(csvHeader).collect(Collectors.toSet());

Review Comment:
   ```suggestion
       Set<String> distinctColumns = new HashSet<>(Arrays.asList(csvHeader));
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>

Review Comment:
   ```suggestion
    * file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>
   ```



##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvRowConversions.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.apache.beam.sdk.io.csv.CsvIO.VALID_FIELD_TYPE_SET;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.csv.CSVFormat;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/** Contains classes and methods to help with converting between {@link Row} and CSV strings. */
+class CsvRowConversions {
+
+  /** Converts between {@link Row} and CSV string using a {@link CSVFormat}. */
+  @AutoValue
+  abstract static class RowToCsv implements SerializableFunction<Row, String> {
+
+    static Builder builder() {
+      return new AutoValue_CsvRowConversions_RowToCsv.Builder();
+    }
+
+    /** The expected {@link Schema} of the {@link Row} input. */
+    abstract Schema getSchema();
+
+    /** The {@link CSVFormat} of the converted {@link Row} input. */
+    abstract CSVFormat getCSVFormat();
+
+    /** Converts a {@link Row} to a CSV string formatted using {@link #getCSVFormat()}. */
+    @Override
+    public String apply(Row input) {
+      Row safeInput = checkNotNull(input);
+      String[] header = getHeader();
+      Object[] values = new Object[header.length];
+      for (int i = 0; i < header.length; i++) {
+        values[i] = safeInput.getValue(header[i]);
+      }
+      return getCSVFormat().format(values);
+    }
+
+    @NonNull
+    String[] getHeader() {
+      return checkNotNull(getCSVFormat().getHeader());
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      /** The expected {@link Schema} of the {@link Row} input. */
+      abstract Builder setSchema(Schema schema);
+
+      abstract Schema getSchema();
+
+      /** The {@link CSVFormat} of the converted {@link Row} input. */
+      abstract Builder setCSVFormat(CSVFormat format);
+
+      abstract CSVFormat getCSVFormat();
+
+      abstract RowToCsv autoBuild();
+
+      final RowToCsv build() {
+        checkArgument(getSchema().getFieldCount() > 0, "Schema has no fields");
+        setCSVFormat(
+            getCSVFormat()
+                .withSkipHeaderRecord()

Review Comment:
   Don't we do this in CsvIO already?, why repeat here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org