You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "damondouglas (via GitHub)" <gi...@apache.org> on 2023/01/24 17:58:06 UTC

[GitHub] [beam] damondouglas commented on a diff in pull request #24630: [24469] Implement CsvIO.Write and supporting classes

damondouglas commented on code in PR #24630:
URL: https://github.com/apache/beam/pull/24630#discussion_r1085693708


##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * {@link PTransform}s for reading and writing CSV files.
+ *
+ * <h2>Reading CSV files</h2>
+ *
+ * <p>Reading from CSV files is not yet implemented. Please see <a
+ * href="https://github.com/apache/beam/issues/24552">https://github.com/apache/beam/issues/24552</a>.
+ *
+ * <h2>Writing CSV files</h2>
+ *
+ * <p>To write a {@link PCollection} to one or more CSV files, use {@link CsvIO.Write}, using {@link
+ * CsvIO#writeRows(String, CSVFormat)} or {@link CsvIO#write(String, CSVFormat)}. {@link
+ * CsvIO.Write} supports writing {@link Row} or custom Java types using an inferred {@link Schema}.
+ * Examples below show both scenarios. See the Beam Programming Guide on <a
+ * href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">inferring
+ * schemas</a> for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ * <p>{@link CsvIO.Write} only supports writing {@link Schema} aware types that do not contain any
+ * nested {@link FieldType}s such a {@link org.apache.beam.sdk.schemas.Schema.TypeName#ROW} or
+ * repeated {@link org.apache.beam.sdk.schemas.Schema.TypeName#ARRAY} types. See {@link
+ * CsvIO.Write#VALID_FIELD_TYPE_SET} for valid {@link FieldType}s.
+ *
+ * <h3>Example usage:</h3>
+ *
+ * <p>Suppose we have the following <code>Transaction</code> class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ * <pre>{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }</pre>
+ *
+ * <p>From a {@code PCollection<Transaction>}, {@link CsvIO.Write} can write one or many CSV files
+ * automatically creating the header based on its inferred {@link Schema}.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions.apply(CsvIO.<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT));
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, whereas by default, {@link CsvIO.Write} will write all fields in <b>sorted order</b>
+ * of the field names.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>To control the order and subset of fields that {@link CsvIO.Write} writes, use {@link
+ * CSVFormat#withHeader(String...)}. Note, however, the following constraints:
+ *
+ * <ol>
+ *   <li>Each header column must match a field name in the {@link Schema}; matching is case
+ *       sensitive.
+ *   <li>Matching header columns must match {@link Schema} fields that are valid {@link FieldType}s;
+ *       see {@link #VALID_FIELD_TYPE_SET}.
+ *   <li>{@link CSVFormat} only allows repeated header columns when {@link
+ *       CSVFormat#withAllowDuplicateHeaderNames()}
+ * </ol>
+ *
+ * <p>The following example shows the use of {@link CSVFormat#withHeader(String...)} to control the
+ * order and subset of <code>Transaction</code> fields.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions ...
+ * transactions.apply(
+ *  CsvIO
+ *    .<Transaction>write("path/to/folder/prefix", CSVFormat.DEFAULT.withHeader("transactionId", "purchaseAmount"))
+ * );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header is repeated for every
+ * shard file, but only including the subset of fields in their listed order.
+ *
+ * <pre>{@code
+ * transactionId,purchaseAmount
+ * 12345,10.23
+ * 54321,54.65
+ * 98765,11.76
+ * }</pre>
+ *
+ * <p>In addition to header customization, {@link CsvIO.Write} supports {@link
+ * CSVFormat#withHeaderComments(Object...)} as shown below. Note that {@link
+ * CSVFormat#withCommentMarker(char)} is required when specifying header comments.
+ *
+ * <pre>{@code
+ * PCollection<Transaction> transactions = ...
+ * transactions
+ *    .apply(
+ *        CsvIO.<Transaction>write("path/to/folder/prefix",
+ *        CSVFormat.DEFAULT.withCommentMarker('#').withHeaderComments("Bank Report", "1970-01-01", "Operator: John Doe")
+ *    );
+ * }</pre>
+ *
+ * <p>The resulting CSV files will look like the following where the header and header comments are
+ * repeated for every shard file.
+ *
+ * <pre>{@code
+ * # Bank Report
+ * # 1970-01-01
+ * # Operator: John Doe
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * <p>A {@link PCollection} of {@link Row}s works just like custom Java types illustrated above,
+ * except we use {@link CsvIO#writeRows(String, CSVFormat)} as shown below for the same {@code
+ * Transaction} class. We derive {@code Transaction}'s {@link Schema} using a {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. Note that
+ * hard-coding the {@link Row}s below is for illustration purposes. Developers are instead
+ * encouraged to take advantage of {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction(TypeDescriptor)}.
+ *
+ * <pre>{@code
+ * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
+ * Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
+ * PCollection<Row> transactions = pipeline.apply(Create.of(
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "A")
+ *    .withFieldValue("purchaseAmount", 10.23)
+ *    .withFieldValue("transactionId", "12345")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "B")
+ *    .withFieldValue("purchaseAmount", 54.65)
+ *    .withFieldValue("transactionId", "54321")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "C")
+ *    .withFieldValue("purchaseAmount", 11.76)
+ *    .withFieldValue("transactionId", "98765")
+ *    .build()
+ * );
+ *
+ * transactions.apply(
+ *  CsvIO
+ *    .writeRowsTo("gs://bucket/path/to/folder/prefix", CSVFormat.DEFAULT)
+ * );
+ * }</pre>
+ *
+ * <p>Writing the transactions {@link PCollection} of {@link Row}s would yield the following CSV
+ * file content.
+ *
+ * <pre>{@code
+ * bank,purchaseAmount,transactionId
+ * A,10.23,12345
+ * B,54.65,54321
+ * C,11.76,98765
+ * }</pre>
+ *
+ * {@link CsvIO.Write} does not support the following {@link CSVFormat} properties and will throw an
+ * {@link IllegalArgumentException}.
+ *
+ * <ul>
+ *   <li>{@link CSVFormat#withSkipHeaderRecord()}
+ *   <li>{@link CSVFormat#withAllowMissingColumnNames()} ()}
+ *   <li>{@link CSVFormat#withAutoFlush(boolean)}
+ *   <li>{@link CSVFormat#withIgnoreHeaderCase()}
+ *   <li>{@link CSVFormat#withIgnoreSurroundingSpaces()}
+ * </ul>
+ */
+public class CsvIO {
+  /**
+   * The valid {@link Schema.FieldType} from which {@link CsvIO} converts CSV records to the fields.
+   *
+   * <ul>
+   *   <li>{@link FieldType#BYTE}
+   *   <li>{@link FieldType#BOOLEAN}
+   *   <li>{@link FieldType#DATETIME}
+   *   <li>{@link FieldType#DECIMAL}
+   *   <li>{@link FieldType#DOUBLE}
+   *   <li>{@link FieldType#INT16}
+   *   <li>{@link FieldType#INT32}
+   *   <li>{@link FieldType#INT64}
+   *   <li>{@link FieldType#FLOAT}
+   *   <li>{@link FieldType#STRING}
+   * </ul>
+   */
+  public static final Set<Schema.FieldType> VALID_FIELD_TYPE_SET =
+      ImmutableSet.of(
+          FieldType.BYTE,
+          FieldType.BOOLEAN,
+          FieldType.DATETIME,
+          FieldType.DECIMAL,
+          FieldType.DOUBLE,
+          FieldType.INT16,
+          FieldType.INT32,
+          FieldType.INT64,
+          FieldType.FLOAT,
+          FieldType.STRING);
+
+  static final String DEFAULT_FILENAME_SUFFIX = ".csv";
+
+  /** Instantiates a {@link Write} for writing user types in {@link CSVFormat} format. */
+  public static <T> Write<T> write(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<T>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** Instantiates a {@link Write} for writing {@link Row}s in {@link CSVFormat} format. */
+  public static Write<Row> writeRows(String to, CSVFormat csvFormat) {
+    return new AutoValue_CsvIO_Write.Builder<Row>()
+        .setTextIOWrite(createDefaultTextIOWrite(to))
+        .setCSVFormat(csvFormat)
+        .build();
+  }
+
+  /** {@link PTransform} for writing CSV files. */
+  @AutoValue
+  public abstract static class Write<T>
+      extends PTransform<PCollection<T>, WriteFilesResult<String>> {
+
+    /** Specifies the {@link Compression} of all generated shard files. */
+    public Write<T> withCompression(Compression compression) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withCompression(compression)).build();
+    }
+
+    public Write<T> withNoSpilling() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. See {@link
+     * TextIO.Write#withNumShards(int)}.
+     */
+    public Write<T> withNumShards(Integer numShards) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withNumShards(numShards)).build();
+    }
+
+    /**
+     * Forces a single file as output and empty shard name template. See {@link
+     * TextIO.Write#withoutSharding()}.
+     */
+    public Write<T> withoutSharding() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withoutSharding()).build();
+    }
+
+    /**
+     * Uses the given {@link ShardNameTemplate} for naming output files. See {@link
+     * TextIO.Write#withShardNameTemplate(String)}.
+     */
+    public Write<T> withShardTemplate(String shardTemplate) {
+      return toBuilder()
+          .setTextIOWrite(getTextIOWrite().withShardNameTemplate(shardTemplate))
+          .build();
+    }
+
+    /**
+     * Configures the filename suffix for written files. See {@link
+     * TextIO.Write#withSuffix(String)}.
+     */
+    public Write<T> withSuffix(String suffix) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withSuffix(suffix)).build();
+    }
+
+    /**
+     * Set the base directory used to generate temporary files. See {@link
+     * TextIO.Write#withTempDirectory(ResourceId)}.
+     */
+    public Write<T> withTempDirectory(ResourceId tempDirectory) {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withTempDirectory(tempDirectory)).build();
+    }
+
+    /**
+     * Preserves windowing of input elements and writes them to files based on the element's window.
+     * See {@link TextIO.Write#withWindowedWrites()}.
+     */
+    public Write<T> withWindowedWrites() {
+      return toBuilder().setTextIOWrite(getTextIOWrite().withWindowedWrites()).build();
+    }
+
+    /**
+     * Returns a transform for writing to text files like this one but that has the given {@link
+     * FileBasedSink.WritableByteChannelFactory} to be used by the {@link FileBasedSink} during
+     * output. See {@link
+     * TextIO.Write#withWritableByteChannelFactory(FileBasedSink.WritableByteChannelFactory)}.
+     */
+    public Write<T> withWritableByteChannelFactory(
+        FileBasedSink.WritableByteChannelFactory writableByteChannelFactory) {
+      return toBuilder()
+          .setTextIOWrite(
+              getTextIOWrite().withWritableByteChannelFactory(writableByteChannelFactory))
+          .build();
+    }
+
+    /** The underlying {@link FileIO.Write} that writes converted input to CSV formatted output. */
+    abstract TextIO.Write getTextIOWrite();
+
+    /** The {@link CSVFormat} to inform headers, header comments, and format CSV row content. */
+    abstract CSVFormat getCSVFormat();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      /**
+       * The underlying {@link FileIO.Write} that writes converted input to CSV formatted output.
+       */
+      abstract Builder<T> setTextIOWrite(TextIO.Write value);
+
+      /** The {@link CSVFormat} to convert input. Defaults to {@link CSVFormat#DEFAULT}. */
+      abstract Builder<T> setCSVFormat(CSVFormat value);
+
+      abstract CSVFormat getCSVFormat();
+
+      abstract Write<T> autoBuild();
+
+      final Write<T> build() {
+        checkArgument(
+            !getCSVFormat().getSkipHeaderRecord(),

Review Comment:
   CsvRowConversions performs many of the checks as they most impact RowToCsv conversion.  While package-private, I thought it best to do the checks closest to where they would impact the outcome.
   
   Additionally, does the 375 to 377 comment clarify why we are checking the skipHeaderRecord property and throwing an exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org