You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:11:39 UTC

[4/6] beam git commit: Better-organized javadocs for TextIO and AvroIO

Better-organized javadocs for TextIO and AvroIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84eb7f3a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84eb7f3a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84eb7f3a

Branch: refs/heads/master
Commit: 84eb7f3ae431b467828a76e305123601d4ee333a
Parents: 184f7a9
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:29:52 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 83 +++++++++++++-------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 30 ++++---
 2 files changed, 75 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/84eb7f3a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 9e0422e..d4a7cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -57,13 +57,20 @@ import org.apache.beam.sdk.values.TypeDescriptors;
 /**
  * {@link PTransform}s for reading and writing Avro files.
  *
+ * <h2>Reading Avro files</h2>
+ *
  * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
- * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify
- * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are
- * themselves in a {@link PCollection}, apply {@link #readAll}.
+ * pipeline construction time, use {@link #read}, using {@link AvroIO.Read#from} to specify the
+ * filename or filepattern to read from. If the filepatterns to be read are themselves in a {@link
+ * PCollection}, apply {@link #readAll}. If the schema is unknown at pipeline construction time, use
+ * {@link #parseGenericRecords} or {@link #parseAllGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these transforms.
  *
  * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
  *
+ * <h3>Reading records of a known schema</h3>
+ *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
  * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
  * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
@@ -71,26 +78,34 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
  * #readAllGenericRecords}.
  *
- * <p>To read records from files whose schema is unknown at pipeline construction time or differs
- * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
- * parsing function for converting each {@link GenericRecord} into a value of your custom type.
- * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
- * #parseAllGenericRecords}.
- *
  * <p>For example:
  *
  * <pre>{@code
  * Pipeline p = ...;
  *
- * // A simple Read of a local file (only runs locally):
+ * // Read Avro-generated classes from files on GCS
  * PCollection<AvroAutoGenClass> records =
- *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("/path/to/file.avro"));
+ *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
  *
- * // A Read from a GCS file (runs locally and using remote execution):
+ * // Read GenericRecord's of the given schema from files on GCS
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records =
  *     p.apply(AvroIO.readGenericRecords(schema)
  *                .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
+ * #parseAllGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
  *
  * PCollection<Foo> records =
  *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
@@ -101,12 +116,7 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     }));
  * }</pre>
  *
- * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link
- * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease
- * performance if the filepattern matches only a small number of files.
- *
- * <p>Reading from a {@link PCollection} of filepatterns:
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
  *
  * <pre>{@code
  * Pipeline p = ...;
@@ -120,6 +130,15 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
  * }</pre>
  *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h2>Writing Avro files</h2>
+ *
  * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
  * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
  * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
@@ -128,13 +147,11 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a
  * custom file naming policy.
  *
- * <p>By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
- * will cause windowing and triggering to be preserved. When producing windowed writes with a
- * streaming runner that supports triggers, the number of output shards must be set explicitly using
- * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
- * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
- * and unique windows and triggers must produce unique filenames.
+ * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
+ * overridden using {@link AvroIO.Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
  *
  * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
  * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
@@ -157,6 +174,18 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     .withSuffix(".avro"));
  * }</pre>
  *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
+ * will cause windowing and triggering to be preserved. When producing windowed writes with a
+ * streaming runner that supports triggers, the number of output shards must be set explicitly using
+ * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
+ * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
+ * and unique windows and triggers must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
  * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
  * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
  * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
@@ -201,10 +230,6 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
  *     .to(new UserDynamicAvros()));
  * }</pre>
- *
- * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
- * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
- * overridden using {@link AvroIO.Write#withCodec}.
  */
 public class AvroIO {
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/84eb7f3a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 835008f..442e4d9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -56,6 +56,8 @@ import org.joda.time.Duration;
 /**
  * {@link PTransform}s for reading and writing text files.
  *
+ * <h2>Reading text files</h2>
+ *
  * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
  * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
  * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
@@ -64,6 +66,8 @@ import org.joda.time.Duration;
  * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to
  * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n').
  *
+ * <h3>Filepattern expansion and watching</h3>
+ *
  * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} and {@link
  * ReadAll#watchForNewFiles} allow streaming of new files matching the filepattern(s).
  *
@@ -81,11 +85,6 @@ import org.joda.time.Duration;
  * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
  * }</pre>
  *
- * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small number
- * of files.
- *
  * <p>Example 2: reading a PCollection of filenames.
  *
  * <pre>{@code
@@ -113,6 +112,15 @@ import org.joda.time.Duration;
  *       afterTimeSinceNewOutput(Duration.standardHours(1))));
  * }</pre>
  *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h2>Writing text files</h2>
+ *
  * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
  * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
  *
@@ -130,6 +138,13 @@ import org.joda.time.Duration;
  *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
  * }</pre>
  *
+ * <p>Any existing files with the same names as generated output files will be overwritten.
+ *
+ * <p>If you want better control over how filenames are generated than the default policy allows, a
+ * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()}
  * will cause windowing and triggering to be preserved. When producing windowed writes with a
@@ -140,8 +155,7 @@ import org.joda.time.Duration;
  * for the window and the pane; W is expanded into the window text, and P into the pane; the default
  * template will include both the window and the pane in the filename.
  *
- * <p>If you want better control over how filenames are generated than the default policy allows, a
- * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
+ * <h3>Writing data to multiple destinations</h3>
  *
  * <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this
  * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class
@@ -166,8 +180,6 @@ import org.joda.time.Duration;
  *       }),
  *       new Params().withBaseFilename(baseDirectory + "/empty");
  * }</pre>
- *
- * <p>Any existing files with the same names as generated output files will be overwritten.
  */
 public class TextIO {
   /**