You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/27 21:57:47 UTC

[2/5] beam-site git commit: Update according to reviewer comments

Update according to reviewer comments


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/bc4d4947
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/bc4d4947
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/bc4d4947

Branch: refs/heads/asf-site
Commit: bc4d4947461244eee83d7c0f31e76cda9971b5cb
Parents: 884971d
Author: Hadar Hod <ha...@google.com>
Authored: Tue Jan 24 23:07:48 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jan 27 13:42:01 2017 -0800

----------------------------------------------------------------------
 src/documentation/programming-guide.md | 135 ++++++++++------------------
 1 file changed, 48 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/bc4d4947/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index 3851bb5..c0e947f 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -69,7 +69,7 @@ A typical Beam driver program works as follows:
 
 When you run your Beam driver program, the Pipeline Runner that you designate constructs a **workflow graph** of your pipeline based on the `PCollection` objects you've created and transforms that you've applied. That graph is then executed using the appropriate distributed processing back-end, becoming an asynchronous "job" (or equivalent) on that back-end.
 
-## <a name="pipeline"></a>Creating the Pipeline
+## <a name="pipeline"></a>Creating the pipeline
 
 The `Pipeline` abstraction encapsulates all the data and steps in your data processing task. Your Beam driver program typically starts by constructing a <span class="language-java">[Pipeline]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/Pipeline.html)</span><span class="language-py">[Pipeline](https://github.com/apache/beam/blob/python-sdk/sdks/python/apache_beam/pipeline.py)</span> object, and then using that object as the basis for creating the pipeline's data sets as `PCollection`s and its operations as `Transform`s.
 
@@ -110,7 +110,7 @@ After you've created your `Pipeline`, you'll need to begin by creating at least
 
 You create a `PCollection` by either reading data from an external source using Beam's [Source API](#io), or you can create a `PCollection` of data stored in an in-memory collection class in your driver program. The former is typically how a production pipeline would ingest data; Beam's Source APIs contain adapters to help you read from external sources like large cloud-based files, databases, or subscription services. The latter is primarily useful for testing and debugging purposes.
 
-#### Reading from an External Source
+#### Reading from an external source
 
 To read from an external source, you use one of the [Beam-provided I/O adapters](#io). The adapters vary in their exact usage, but all of them from some external data source and return a `PCollection` whose elements represent the data records in that source. 
 
@@ -141,7 +141,7 @@ lines = p | 'ReadMyFile' >> beam.io.Read(beam.io.TextFileSource("protocol://path
 
 See the [section on I/O](#io) to learn more about how to read from the various data sources supported by the Beam SDK.
 
-#### Creating a PCollection from In-Memory Data
+#### Creating a PCollection from in-memory data
 
 {:.language-java}
 To create a `PCollection` from an in-memory Java `Collection`, you use the Beam-provided `Create` transform. Much like a data adapter's `Read`, you apply `Create` directly to your `Pipeline` object itself.
@@ -190,11 +190,11 @@ p = beam.Pipeline()
 collection = p | 'ReadMyLines' >> beam.Create(lines)
 ```
 
-### <a name="pccharacteristics"></a>PCollection Characteristics
+### <a name="pccharacteristics"></a>PCollection characteristics
 
 A `PCollection` is owned by the specific `Pipeline` object for which it is created; multiple pipelines cannot share a `PCollection`. In some respects, a `PCollection` functions like a collection class. However, a `PCollection` can differ in a few key ways:
 
-#### <a name="pcelementtype"></a>Element Type
+#### <a name="pcelementtype"></a>Element type
 
 The elements of a `PCollection` may be of any type, but must all be of the same type. However, to support distributed processing, Beam needs to be able to encode each individual element as a byte string (so elements can be passed around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed.
 
@@ -202,11 +202,11 @@ The elements of a `PCollection` may be of any type, but must all be of the same
 
 A `PCollection` is immutable. Once created, you cannot add, remove, or change individual elements. A Beam Transform might process each element of a `PCollection` and generate new pipeline data (as a new `PCollection`), *but it does not consume or modify the original input collection*.
 
-#### <a name="pcrandomaccess"></a>Random Access
+#### <a name="pcrandomaccess"></a>Random access
 
 A `PCollection` does not support random access to individual elements. Instead, Beam Transforms consider every element in a `PCollection` individually.
 
-#### <a name="pcsizebound"></a>Size and Boundedness
+#### <a name="pcsizebound"></a>Size and boundedness
 
 A `PCollection` is a large, immutable "bag" of elements. There is no upper limit on how many elements a `PCollection` can contain; any given `PCollection` might fit in memory on a single machine, or it might represent a very large distributed data set backed by a persistent data store.
 
@@ -216,7 +216,7 @@ The bounded (or unbounded) nature of your `PCollection` affects how Beam process
 
 When performing an operation that groups elements in an unbounded `PCollection`, Beam requires a concept called **Windowing** to divide a continuously updating data set into logical windows of finite size.  Beam processes each window as a bundle, and processing continues as the data set is generated. These logical windows are determined by some characteristic associated with a data element, such as a **timestamp**.
 
-#### <a name="pctimestamps"></a>Element Timestamps
+#### <a name="pctimestamps"></a>Element timestamps
 
 Each element in a `PCollection` has an associated intrinsic **timestamp**. The timestamp for each element is initially assigned by the [Source](#io) that creates the `PCollection`. Sources that create an unbounded `PCollection` often assign each new element a timestamp that corresponds to when the element was read or added.
 
@@ -226,7 +226,7 @@ Timestamps are useful for a `PCollection` that contains elements with an inheren
 
 You can manually assign timestamps to the elements of a `PCollection` if the source doesn't do it for you. You'll want to do this if the elements have an inherent timestamp, but the timestamp is somewhere in the structure of the element itself (such as a "time" field in a server log entry). Beam has [Transforms](#transforms) that take a `PCollection` as input and output an identical `PCollection` with timestamps attached; see [Assigning Timestamps](#windowing) for more information on how to do so.
 
-## <a name="transforms"></a>Applying Transforms
+## <a name="transforms"></a>Applying transforms
 
 In the Beam SDKs, **transforms** are the operations in your pipeline. A transform takes a `PCollection` (or more than one `PCollection`) as input, performs an operation that you specify on each element in that collection, and produces a new output `PCollection`. To invoke a transform, you must **apply** it to the input `PCollection`.
 
@@ -277,7 +277,7 @@ You can also build your own [composite transforms](#transforms-composite) that n
 
 The transforms in the Beam SDKs provide a generic **processing framework**, where you provide processing logic in the form of a function object (colloquially referred to as "user code"). The user code gets applied to the elements of the input `PCollection`. Instances of your user code might then be executed in parallel by many different workers across a cluster, depending on the pipeline runner and back-end that you choose to execute your Beam pipeline. The user code running on each worker generates the output elements that are ultimately added to the final output `PCollection` that the transform produces.
 
-### Core Beam Transforms
+### Core Beam transforms
 
 Beam provides the following transforms, each of which represents a different processing paradigm:
 
@@ -390,7 +390,7 @@ In your processing method, you'll also need to meet some immutability requiremen
 * Once you output a value using `ProcessContext.output()` or `ProcessContext.sideOutput()`, you should not modify that value in any way.
 
 
-##### Lightweight DoFns and Other Abstractions
+##### Lightweight DoFns and other abstractions
 
 If your function is relatively straightforward, you can simplify your use of `ParDo` by providing a lightweight `DoFn` in-line, as <span class="language-java">an anonymous inner class instance</span><span class="language-py">a lambda function</span>.
 
@@ -403,9 +403,8 @@ PCollection<String> words = ...;
 // Apply a ParDo with an anonymous DoFn to the PCollection words.
 // Save the result as the PCollection wordLengths.
 PCollection<Integer> wordLengths = words.apply(
-  ParDo
-    .named("ComputeWordLengths")            // the transform name
-    .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
+  "ComputeWordLengths",                     // the transform name
+  ParDo.of(new DoFn<String, Integer>() {    // a DoFn as an anonymous inner class instance
       @ProcessElement
       public void processElement(ProcessContext c) {
         c.output(c.element().length());
@@ -497,7 +496,7 @@ When you apply a `Combine` transform, you must provide the function that contain
 
 Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of `CombineFn` that has an accumulation type distinct from the input/output type.
 
-##### **Simple Combinations Using Simple Functions**
+##### **Simple combinations using simple functions**
 
 The following example code shows a simple combine function.
 
@@ -519,7 +518,7 @@ public static class SumInts implements SerializableFunction<Iterable<Integer>, I
 {% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:combine_bounded_sum
 %}```
 
-##### **Advanced Combinations using CombineFn**
+##### **Advanced combinations using CombineFn**
 
 For more complex combine functions, you can define a subclass of `CombineFn`. You should use `CombineFn` if the combine function requires a more sophisticated accumulator, must perform additional pre- or post-processing, might change the output type, or takes the key into account.
 
@@ -576,7 +575,7 @@ pc = ...
 
 If you are combining a `PCollection` of key-value pairs, [per-key combining](#transforms-combine-per-key) is often enough. If you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a `KeyedCombineFn` to access the key within the combining strategy.
 
-##### **Combining a PCollection into a Single Value**
+##### **Combining a PCollection into a single value**
 
 Use the global combine to transform all of the elements in a given `PCollection` into a single value, represented in your pipeline as a new `PCollection` containing one element. The following example code shows how to apply the Beam provided sum combine function to produce a single sum value for a `PCollection` of integers.
 
@@ -595,7 +594,7 @@ pc = ...
 result = pc | beam.CombineGlobally(sum)
 ```
 
-##### Global Windowing:
+##### Global windowing:
 
 If your input `PCollection` uses the default global windowing, the default behavior is to return a `PCollection` containing one item. That item's value comes from the accumulator in the combine function that you specified when applying `Combine`. For example, the Beam provided sum combine function returns a zero value (the sum of an empty input), while the min combine function returns a maximal or infinite value.
 
@@ -613,7 +612,7 @@ sum = pc | beam.CombineGlobally(sum).without_defaults()
 
 ```
 
-##### Non-Global Windowing:
+##### Non-global windowing:
 
 If your `PCollection` uses any non-global windowing function, Beam does not provide the default behavior. You must specify one of the following options when applying `Combine`:
 
@@ -621,7 +620,7 @@ If your `PCollection` uses any non-global windowing function, Beam does not prov
 * Specify `.asSingletonView`, in which the output is immediately converted to a `PCollectionView`, which will provide a default value for each empty window when used as a side input. You'll generally only need to use this option if the result of your pipeline's `Combine` is to be used as a side input later in the pipeline.
 
 
-##### <a name="transforms-combine-per-key"></a>**Combining Values in a Key-Grouped Collection**
+##### <a name="transforms-combine-per-key"></a>**Combining values in a key-grouped collection**
 
 After creating a key-grouped collection (for example, by using a `GroupByKey` transform) a common pattern is to combine the collection of values associated with each key into a single, merged value. Drawing on the previous example from `GroupByKey`, a key-grouped `PCollection` called `groupedWords` looks like this:
 
@@ -688,11 +687,11 @@ merged = (
     | beam.Flatten())
 ```
 
-##### Data Encoding in Merged Collections:
+##### Data encoding in merged collections:
 
 By default, the coder for the output `PCollection` is the same as the coder for the first `PCollection` in the input `PCollectionList`. However, the input `PCollection` objects can each use different coders, as long as they all contain the same data type in your chosen language.
 
-##### Merging Windowed Collections:
+##### Merging windowed collections:
 
 When using `Flatten` to merge `PCollection` objects that have a windowing strategy applied, all of the `PCollection` objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you're merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.
 
@@ -733,7 +732,7 @@ by_decile = students | beam.Partition(partition_fn, 10)
 fortieth_percentile = by_decile[4]
 ```
 
-#### <a name="transforms-usercodereqs"></a>General Requirements for Writing User Code for Beam Transforms
+#### <a name="transforms-usercodereqs"></a>General Requirements for writing user code for Beam transforms
 
 When you build user code for a Beam transform, you should keep in mind the distributed nature of execution. For example, there might be many copies of your function running on a lot of different machines in parallel, and those copies function independently, without communicating or sharing state with any of the other copies. Depending on the Pipeline Runner and processing back-end you choose for your pipeline, each copy of your user code function may be retried or run multiple times. As such, you should be cautious about including things like state dependency in your user code.
 
@@ -758,7 +757,7 @@ Some other serializability factors you should keep in mind are:
 * Mutating a function object after it gets applied will have no effect.
 * Take care when declaring your function object inline by using an anonymous inner class instance. In a non-static context, your inner class instance will implicitly contain a pointer to the enclosing class and that class' state. That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class.
 
-##### Thread-Compatibility
+##### Thread-compatibility
 
 Your function object should be thread-compatible. Each instance of your function object is accessed by a single thread on a worker instance, unless you explicitly create your own threads. Note, however, that **the Beam SDKs are not thread-safe**. If you create your own threads in your user code, you must provide your own synchronization. Note that static members in your function object are not passed to worker instances and that multiple instances of your function may be accessed from different threads.
 
@@ -768,14 +767,14 @@ It's recommended that you make your function object idempotent--that is, that it
 
 #### <a name="transforms-sideio"></a>Side Inputs and Side Outputs
 
-##### **Side Inputs**
+##### **Side inputs**
 
 In addition to the main input `PCollection`, you can provide additional inputs to a `ParDo` transform in the form of side inputs. A side input is an additional input that your `DoFn` can access each time it processes an element in the input `PCollection`. When you specify a side input, you create a view of some other data that can be read from within the `ParDo` transform's `DoFn` while procesing each element.
 
 Side inputs are useful if your `ParDo` needs to inject additional data when processing each element in the input `PCollection`, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.
 
 
-##### Passing Side Inputs to ParDo:
+##### Passing side inputs to ParDo:
 
 ```java
   // Pass side inputs to your ParDo transform by invoking .withSideInputs.
@@ -824,7 +823,7 @@ Side inputs are useful if your `ParDo` needs to inject additional data when proc
 
 ```
 
-##### Side Inputs and Windowing:
+##### Side inputs and windowing:
 
 A windowed `PCollection` may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a `PCollectionView` of a windowed `PCollection`, the `PCollectionView` represents a single entity per window (one singleton per window, one list per window, etc.).
 
@@ -836,11 +835,11 @@ If the main input element exists in more than one window, then `processElement`
 
 If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.
 
-##### **Side Outputs**
+##### **Side outputs**
 
 While `ParDo` always produces a main output `PCollection` (as the return value from apply), you can also have your `ParDo` produce any number of additional output `PCollection`s. If you choose to have multiple outputs, your `ParDo` returns all of the output `PCollection`s (including the main output) bundled together.
 
-##### Tags for Side Outputs:
+##### Tags for side outputs:
 
 ```java
 // To emit elements to a side output PCollection, create a TupleTag object to identify each collection that your ParDo produces.
@@ -902,7 +901,7 @@ While `ParDo` always produces a main output `PCollection` (as the return value f
 {% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs_iter
 %}```
 
-##### Emitting to Side Outputs in your DoFn:
+##### Emitting to side outputs in your DoFn:
 
 ```java
 // Inside your ParDo's DoFn, you can emit an element to a side output by using the method ProcessContext.sideOutput.
@@ -948,77 +947,39 @@ When you create a pipeline, you often need to read data from some external sourc
 
 > A guide that covers how to implement your own Beam IO transforms is in progress ([BEAM-1025](https://issues.apache.org/jira/browse/BEAM-1025)).
 
-### Reading Input Data
+### Reading input data
 
-Read transforms read data from an external source and return a `PCollection` representation of the data for use by your pipeline. You can use a read transform at any point while constructing your pipeline to create a new `PCollection`, though it will be most common at the start of your pipeline. Here are examples of two common ways to read data.
-
-#### Reading from a `Source`:
-
-```java
-// A fully-specified Read from a GCS file:
-PCollection<Integer> numbers =
-  p.apply("ReadNumbers", TextIO.Read
-   .from("gs://my_bucket/path/to/numbers-*.txt")
-   .withCoder(TextualIntegerCoder.of()));
-```
-
-```python
-pipeline | beam.io.ReadFromText('protocol://path/to/some/inputData.txt')
-```
-
-Note that many sources use the builder pattern for setting options. For additional examples, see the language-specific documentation (such as Javadoc) for each of the sources.
+Read transforms read data from an external source and return a `PCollection` representation of the data for use by your pipeline. You can use a read transform at any point while constructing your pipeline to create a new `PCollection`, though it will be most common at the start of your pipeline.
 
 #### Using a read transform:
 
 ```java
-// This example uses JmsIO.
-PCollection<JmsRecord> output =
-    pipeline.apply(JmsIO.read()
-        .withConnectionFactory(myConnectionFactory)
-        .withQueue("my-queue"))
+PCollection<String> lines = p.apply(TextIO.Read.from("gs://some/inputData.txt"));   
 ```
 
 ```python
-pipeline | beam.io.textio.ReadFromText('my_file_name')
+lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')
 ```
 
-### Writing Output Data
+### Writing output data
 
-Write transforms write the data in a `PCollection` to an external data source. You will most often use write transforms at the end of your pipeline to output your pipeline's final results. However, you can use a write transform to output a `PCollection`'s data at any point in your pipeline. Here are examples of two common ways to write data.
-
-#### Writing to a `Sink`:
-
-```java
-// This example uses XmlSink.
-pipeline.apply(Write.to(
-          XmlSink.ofRecordClass(Type.class)
-              .withRootElementName(root_element)
-              .toFilenamePrefix(output_filename)));
-```
-
-```python
-output | beam.io.WriteToText('my_file_name')
-```
+Write transforms write the data in a `PCollection` to an external data source. You will most often use write transforms at the end of your pipeline to output your pipeline's final results. However, you can use a write transform to output a `PCollection`'s data at any point in your pipeline. 
 
-#### Using a write transform:
+#### Using a Write transform:
 
 ```java
-// This example uses JmsIO.
-pipeline.apply(...) // returns PCollection<String>
-        .apply(JmsIO.write()
-            .withConnectionFactory(myConnectionFactory)
-            .withQueue("my-queue")
+output.apply(TextIO.Write.to("gs://some/outputData"));
 ```
 
 ```python
-output | beam.io.textio.WriteToText('my_file_name')
+output | beam.io.WriteToText('gs://some/outputData')
 ```
 
 ### File-based input and output data
 
-#### Reading From Multiple Locations:
+#### Reading from multiple locations:
 
-Many read transforms support reading from multiple input files matching a glob operator you provide. The following TextIO example uses a glob operator (\*) to read all matching input files that have prefix "input-" and the suffix ".csv" in the given location:
+Many read transforms support reading from multiple input files matching a glob operator you provide. Note that glob operators are filesystem-specific and obey filesystem-specific consistency models. The following TextIO example uses a glob operator (\*) to read all matching input files that have prefix "input-" and the suffix ".csv" in the given location:
 
 ```java
 p.apply(\u201cReadFromText\u201d,
@@ -1031,18 +992,18 @@ lines = p | beam.io.Read(
     beam.io.TextFileSource('protocol://my_bucket/path/to/input-*.csv'))
 ```
 
-To read data from disparate sources into a single `PCollection`, read each one independently and then use the `Flatten` transform to create a single `PCollection`.
+To read data from disparate sources into a single `PCollection`, read each one independently and then use the [Flatten](#transforms-flatten-partition) transform to create a single `PCollection`.
 
-#### Writing To Multiple Output Files:
+#### Writing to multiple output files:
 
 For file-based output data, write transforms write to multiple output files by default. When you pass an output file name to a write transform, the file name is used as the prefix for all output files that the write transform produces. You can append a suffix to each output file by specifying a suffix.
 
 The following write transform example writes multiple output files to a location. Each file has the prefix "numbers", a numeric tag, and the suffix ".csv".
 
 ```java
-records.apply(TextIO.Write.named("WriteToText")
-                          .to("protocol://my_bucket/path/to/numbers")
-                          .withSuffix(".csv"));
+records.apply("WriteToText",
+    TextIO.Write.to("protocol://my_bucket/path/to/numbers")
+                .withSuffix(".csv"));
 ```
 
 ```python
@@ -1050,7 +1011,7 @@ filtered_words | beam.io.WriteToText(
 'protocol://my_bucket/path/to/numbers', file_name_suffix='.csv')
 ```
 
-### Beam provided I/O APIs
+### Beam-provided I/O APIs
 
 See the language specific source code directories for the Beam supported I/O APIs. Specific documentation for each of these I/O sources will be added in the future. ([BEAM-1054](https://issues.apache.org/jira/browse/BEAM-1054))
 
@@ -1100,7 +1061,7 @@ See the language specific source code directories for the Beam supported I/O API
 </table>
 
 
-## <a name="running"></a>Running the Pipeline
+## <a name="running"></a>Running the pipeline
 
 To run your pipeline, use the `run` method. The program you create sends a specification for your pipeline to a pipeline runner, which then constructs and runs the actual series of pipeline operations. Pipelines are executed asynchronously by default.
 
@@ -1119,7 +1080,7 @@ pipeline.run().waitUntilFinish();
 ```
 
 ```python
-# Not currently supported.
+pipeline.run().wait_until_finish();
 ```
 
 <a name="transforms-composite"></a>