You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/11/24 06:07:49 UTC

[1/3] incubator-beam-site git commit: [BEAM-277] Add transforms section to the programming guide

Repository: incubator-beam-site
Updated Branches:
  refs/heads/asf-site 1c9f85626 -> 1b458f102


[BEAM-277] Add transforms section to the programming guide


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/3627a440
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/3627a440
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/3627a440

Branch: refs/heads/asf-site
Commit: 3627a440d0156faa63e4f57ad46a5e79acea84f8
Parents: 1c9f856
Author: melissa <me...@google.com>
Authored: Mon Nov 21 11:22:04 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 22:06:29 2016 -0800

----------------------------------------------------------------------
 src/documentation/programming-guide.md | 496 +++++++++++++++++++++++++++-
 1 file changed, 492 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/3627a440/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index 648962a..18e7800 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -27,6 +27,7 @@ The **Beam Programming Guide** is intended for Beam users who want to use the Be
   * [Using ParDo](#transforms-pardo)
   * [Using GroupByKey](#transforms-gbk)
   * [Using Combine](#transforms-combine)
+  * [Using Flatten and Partition](#transforms-flatten-partition)
   * [General Requirements for Writing User Code for Beam Transforms](#transforms-usercodereqs)
   * [Side Inputs and Side Outputs](#transforms-sideio)
 * [I/O](#io)
@@ -205,7 +206,7 @@ However, note that a transform *does not consume or otherwise alter* the input c
 [Output PCollection 2] = [Input PCollection].apply([Transform 2])
 ```
 
-The resulting workflow graph from the branching pipeline abouve looks like this:
+The resulting workflow graph from the branching pipeline above looks like this:
 
 [Branching Graph Graphic]
 
@@ -222,7 +223,7 @@ Beam provides the following transforms, each of which represents a different pro
 * `ParDo`
 * `GroupByKey`
 * `Combine`
-* `Flatten`
+* `Flatten` and `Partition`
 
 #### <a name="transforms-pardo"></a>ParDo
 
@@ -374,10 +375,266 @@ tree, [2]
 Thus, `GroupByKey` represents a transform from a multimap (multiple keys to individual values) to a uni-map (unique keys to collections of values).
 
 > **A Note on Key/Value Pairs:** Beam represents key/value pairs slightly differently depending on the language and SDK you're using. In the Beam SDK for Java, you represent a key/value pair with an object of type `KV<K, V>`. In Python, you represent key/value pairs with 2-tuples.
-     
+
 
 #### <a name="transforms-combine"></a>Using Combine
 
+<span class="language-java">[`Combine`]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/Combine.html)</span><span class="language-python">[`Combine`](https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py)</span> is a Beam transform for combining collections of elements or values in your data. `Combine` has variants that work on entire `PCollection`s, and some that combine the values for each key in `PCollection`s of key/value pairs.
+
+When you apply a `Combine` transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and max.
+
+Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of `CombineFn` that has an accumulation type distinct from the input/output type.
+
+##### **Simple Combinations Using Simple Functions**
+
+The following example code shows a simple combine function.
+
+```java
+// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.
+public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
+  @Override
+  public Integer apply(Iterable<Integer> input) {
+    int sum = 0;
+    for (int item : input) {
+      sum += item;
+    }
+    return sum;
+  }
+}
+```
+
+```python
+# A bounded sum of positive integers.
+def bounded_sum(values, bound=500):
+  return min(sum(values), bound)
+```
+
+##### **Advanced Combinations using CombineFn**
+
+For more complex combine functions, you can define a subclass of `CombineFn`. You should use `CombineFn` if the combine function requires a more sophisticated accumulator, must perform additional pre- or post-processing, might change the output type, or takes the key into account.
+
+A general combining operation consists of four operations. When you create a subclass of `CombineFn`, you must provide four operations by overriding the corresponding methods:
+
+1. **Create Accumulator** creates a new "local" accumulator. In the example case, taking a mean average, a local accumulator tracks the running sum of values (the numerator value for our final average division) and the number of values summed so far (the denominator value). It may be called any number of times in a distributed fashion.
+
+2. **Add Input** adds an input element to an accumulator, returning the accumulator value. In our example, it would update the sum and increment the count. It may also be invoked in parallel.
+
+3. **Merge Accumulators** merges several accumulators into a single accumulator; this is how data in multiple accumulators is combined before the final calculation. In the case of the mean average computation, the accumulators representing each portion of the division are merged together. It may be called again on its outputs any number of times.
+
+4. **Extract Output** performs the final computation. In the case of computing a mean average, this means dividing the combined sum of all the values by the number of values summed. It is called once on the final, merged accumulator.
+
+The following example code shows how to define a `CombineFn` that computes a mean average:
+
+```java
+public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
+  public static class Accum {
+    int sum = 0;
+    int count = 0;
+  }
+
+  @Override
+  public Accum createAccumulator() { return new Accum(); }
+
+  @Override
+  public Accum addInput(Accum accum, Integer input) {
+      accum.sum += input;
+      accum.count++;
+      return accum;
+  }
+
+  @Override
+  public Accum mergeAccumulators(Iterable<Accum> accums) {
+    Accum merged = createAccumulator();
+    for (Accum accum : accums) {
+      merged.sum += accum.sum;
+      merged.count += accum.count;
+    }
+    return merged;
+  }
+
+  @Override
+  public Double extractOutput(Accum accum) {
+    return ((double) accum.sum) / accum.count;
+  }
+}
+```
+
+```python
+pc = ...
+class AverageFn(beam.CombineFn):
+  def create_accumulator(self):
+    return (0.0, 0)
+
+  def add_input(self, (sum, count), input):
+    return sum + input, count + 1
+
+  def merge_accumulators(self, accumulators):
+    sums, counts = zip(*accumulators)
+    return sum(sums), sum(counts)
+
+  def extract_output(self, (sum, count)):
+    return sum / count if count else float('NaN')
+```
+
+If you are combining a `PCollection` of key-value pairs, [per-key combining](#transforms-combine-per-key) is often enough. If you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a `KeyedCombineFn` to access the key within the combining strategy.
+
+##### **Combining a PCollection into a Single Value**
+
+Use the global combine to transform all of the elements in a given `PCollection` into a single value, represented in your pipeline as a new `PCollection` containing one element. The following example code shows how to apply the Beam provided sum combine function to produce a single sum value for a `PCollection` of integers.
+
+```java
+// Sum.SumIntegerFn() combines the elements in the input PCollection.
+// The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection.
+PCollection<Integer> pc = ...;
+PCollection<Integer> sum = pc.apply(
+   Combine.globally(new Sum.SumIntegerFn()));
+```
+
+```python
+# sum combines the elements in the input PCollection.
+# The resulting PCollection, called result, contains one value: the sum of all the elements in the input PCollection.
+pc = ...
+result = pc | beam.CombineGlobally(sum)
+```
+
+##### Global Windowing:
+
+If your input `PCollection` uses the default global windowing, the default behavior is to return a `PCollection` containing one item. That item's value comes from the accumulator in the combine function that you specified when applying `Combine`. For example, the Beam provided sum combine function returns a zero value (the sum of an empty input), while the min combine function returns a maximal or infinite value.
+
+To have `Combine` instead return an empty `PCollection` if the input is empty, specify `.withoutDefaults` when you apply your `Combine` transform, as in the following code example:
+
+```java
+PCollection<Integer> pc = ...;
+PCollection<Integer> sum = pc.apply(
+  Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());
+```
+
+```python
+pc = ...
+sum = pc | beam.CombineGlobally(sum).without_defaults()
+
+```
+
+##### Non-Global Windowing:
+
+If your `PCollection` uses any non-global windowing function, Beam does not provide the default behavior. You must specify one of the following options when applying `Combine`:
+
+* Specify `.withoutDefaults`, where windows that are empty in the input `PCollection` will likewise be empty in the output collection.
+* Specify `.asSingletonView`, in which the output is immediately converted to a `PCollectionView`, which will provide a default value for each empty window when used as a side input. You'll generally only need to use this option if the result of your pipeline's `Combine` is to be used as a side input later in the pipeline.
+
+
+##### <a name="transforms-combine-per-key"></a>**Combining Values in a Key-Grouped Collection**
+
+After creating a key-grouped collection (for example, by using a `GroupByKey` transform) a common pattern is to combine the collection of values associated with each key into a single, merged value. Drawing on the previous example from `GroupByKey`, a key-grouped `PCollection` called `groupedWords` looks like this:
+
+```
+  cat, [1,5,9]
+  dog, [5,2]
+  and, [1,2,6]
+  jump, [3]
+  tree, [2]
+  ...
+```
+
+In the above `PCollection`, each element has a string key (for example, "cat") and an iterable of integers for its value (in the first element, containing [1, 5, 9]). If our pipeline's next processing step combines the values (rather than considering them individually), you can combine the iterable of integers to create a single, merged value to be paired with each key. This pattern of a `GroupByKey` followed by merging the collection of values is equivalent to Beam's Combine PerKey transform. The combine function you supply to Combine PerKey must be an associative reduction function or a subclass of `CombineFn`.
+
+```java
+// PCollection is grouped by key and the Double values associated with each key are combined into a Double.
+PCollection<KV<String, Double>> salesRecords = ...;
+PCollection<KV<String, Double>> totalSalesPerPerson =
+  salesRecords.apply(Combine.<String, Double, Double>perKey(
+    new Sum.SumDoubleFn()));
+
+// The combined value is of a different type than the original collection of values per key.
+// PCollection has keys of type String and values of type Integer, and the combined value is a Double.
+
+PCollection<KV<String, Integer>> playerAccuracy = ...;
+PCollection<KV<String, Double>> avgAccuracyPerPlayer =
+  playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
+    new MeanInts())));
+```
+
+```python
+# PCollection is grouped by key and the numeric values associated with each key are averaged into a float.
+player_accuracies = ...
+avg_accuracy_per_player = (player_accuracies
+                           | beam.CombinePerKey(
+                               beam.combiners.MeanCombineFn()))
+```
+
+#### <a name="transforms-flatten-partition"></a>Using Flatten and Partition
+
+<span class="language-java">[`Flatten`]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/Flatten.html)</span><span class="language-python">[`Flatten`](https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py)</span> and <span class="language-java">[`Partition`]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/Partition.html)</span><span class="language-python">[`Partition`](https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py)</span> are Beam transforms for `PCollection` objects that store the same data type. `Flatten` merges multiple `PCollection` objects into a single logical `PCollection`, and `Partition` splits a single `PCollection` into a fixed number of smaller collections.
+
+##### **Flatten**
+
+The following example shows how to apply a `Flatten` transform to merge multiple `PCollection` objects.
+
+```java
+// Flatten takes a PCollectionList of PCollection objects of a given type.
+// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
+PCollection<String> pc1 = ...;
+PCollection<String> pc2 = ...;
+PCollection<String> pc3 = ...;
+PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
+
+PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
+```
+
+```python
+# Flatten takes a tuple of PCollection objects.
+# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.
+merged = (
+    (pcoll1, pcoll2, pcoll3)
+    # A list of tuples can be "piped" directly into a Flatten transform.
+    | beam.Flatten())
+```
+
+##### Data Encoding in Merged Collections:
+
+By default, the coder for the output `PCollection` is the same as the coder for the first `PCollection` in the input `PCollectionList`. However, the input `PCollection` objects can each use different coders, as long as they all contain the same data type in your chosen language.
+
+##### Merging Windowed Collections:
+
+When using `Flatten` to merge `PCollection` objects that have a windowing strategy applied, all of the `PCollection` objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you're merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.
+
+If your pipeline attempts to use `Flatten` to merge `PCollection` objects with incompatible windows, Beam generates an `IllegalStateException` error when your pipeline is constructed.
+
+##### **Partition**
+
+`Partition` divides the elements of a `PCollection` according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input `PCollection` into each resulting partition `PCollection`. The number of partitions must be determined at graph construction time. You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).
+
+The following example divides a `PCollection` into percentile groups.
+
+```java
+// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
+// In this example, we define the PartitionFn in-line.
+// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
+PCollection<Student> students = ...;
+// Split students up into 10 partitions, by percentile:
+PCollectionList<Student> studentsByPercentile =
+    students.apply(Partition.of(10, new PartitionFn<Student>() {
+        public int partitionFor(Student student, int numPartitions) {
+            return student.getPercentile()  // 0..99
+                 * numPartitions / 100;
+        }}));
+
+// You can extract each partition from the PCollectionList using the get method, as follows:
+PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
+```
+
+```python
+# Provide an int value with the desired number of result partitions, and a partitioning function (partition_fn in this example).
+# Returns a tuple of PCollection objects containing each of the resulting partitions as individual PCollection objects.
+def partition_fn(student, num_partitions):
+  return int(get_percentile(student) * num_partitions / 100)
+
+by_decile = students | beam.Partition(partition_fn, 10)
+
+# You can extract each partition from the tuple of PCollection objects as follows:
+fortieth_percentile = by_decile[4]
+```
+
 #### <a name="transforms-usercodereqs"></a>General Requirements for Writing User Code for Beam Transforms
 
 When you build user code for a Beam transform, you should keep in mind the distributed nature of execution. For example, there might be many copies of your function running on a lot of different machines in parallel, and those copies function independently, without communicating or sharing state with any of the other copies. Depending on the Pipeline Runner and processing back-end you choose for your pipeline, each copy of your user code function may be retried or run multiple times. As such, you should be cautious about including things like state dependency in your user code.
@@ -411,10 +668,241 @@ Your function object should be thread-compatible. Each instance of your function
 
 It's recommended that you make your function object idempotent--that is, that it can be repeated or retried as often as necessary without causing unintended side effects. The Beam model provides no guarantees as to the number of times your user code might be invoked or retried; as such, keeping your function object idempotent keeps your pipeline's output deterministic, and your transforms' behavior more predictable and easier to debug.
 
+#### <a name="transforms-sideio"></a>Side Inputs and Side Outputs
+
+##### **Side Inputs**
+
+In addition to the main input `PCollection`, you can provide additional inputs to a `ParDo` transform in the form of side inputs. A side input is an additional input that your `DoFn` can access each time it processes an element in the input `PCollection`. When you specify a side input, you create a view of some other data that can be read from within the `ParDo` transform's `DoFn` while procesing each element.
+
+Side inputs are useful if your `ParDo` needs to inject additional data when processing each element in the input `PCollection`, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.
+
+
+##### Passing Side Inputs to ParDo:
+
+```java
+  // Pass side inputs to your ParDo transform by invoking .withSideInputs.
+  // Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.
+
+  // The input PCollection to ParDo.
+  PCollection<String> words = ...;
+
+  // A PCollection of word lengths that we'll combine into a single value.
+  PCollection<Integer> wordLengths = ...; // Singleton PCollection
+
+  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
+  final PCollectionView<Integer> maxWordLengthCutOffView =
+     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());
+
+
+  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
+  PCollection<String> wordsBelowCutOff =
+  words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
+                    .of(new DoFn<String, String>() {
+      public void processElement(ProcessContext c) {
+        String word = c.element();
+        // In our DoFn, access the side input.
+        int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
+        if (word.length() <= lengthCutOff) {
+          c.output(word);
+        }
+  }}));
+```
+
+```python
+# Side inputs are available as extra arguments in the DoFn's process method or Map / FlatMap's callable.
+# Optional, positional, and keyword arguments are all supported. Deferred arguments are unwrapped into their actual values.
+# For example, using pvalue.AsIter(pcoll) at pipeline construction time results in an iterable of the actual elements of pcoll being passed into each process invocation.
+# In this example, side inputs are passed to a FlatMap transform as extra arguments and consumed by filter_using_length.
+
+# Callable takes additional arguments.
+def filter_using_length(word, lower_bound, upper_bound=float('inf')):
+  if lower_bound <= len(word) <= upper_bound:
+    yield word
+
+# Construct a deferred side input.
+avg_word_len = (words
+                | beam.Map(len)
+                | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
+
+# Call with explicit side inputs.
+small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
+
+# A single deferred side input.
+larger_than_average = (words | 'large' >> beam.FlatMap(
+    filter_using_length,
+    lower_bound=pvalue.AsSingleton(avg_word_len)))
+
+# Mix and match.
+small_but_nontrivial = words | beam.FlatMap(filter_using_length,
+                                            lower_bound=2,
+                                            upper_bound=pvalue.AsSingleton(
+                                                avg_word_len))
+
+
+# We can also pass side inputs to a ParDo transform, which will get passed to its process method.
+# The only change is that the first arguments are self and a context, rather than the PCollection element itself.
+
+class FilterUsingLength(beam.DoFn):
+  def process(self, context, lower_bound, upper_bound=float('inf')):
+    if lower_bound <= len(context.element) <= upper_bound:
+      yield context.element
+
+small_words = words | beam.ParDo(FilterUsingLength(), 0, 3)
+...
+
+```
+
+##### Side Inputs and Windowing:
+
+A windowed `PCollection` may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a `PCollectionView` of a windowed `PCollection`, the `PCollectionView` represents a single entity per window (one singleton per window, one list per window, etc.).
+
+Beam uses the window(s) for the main input element to look up the appropriate window for the side input element. Beam projects the main input element's window into the side input's window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window. However, if the inputs have different windows, Beam uses the projection to choose the most appropriate side input window.
+
+For example, if the main input is windowed using fixed-time windows of one minute, and the side input is windowed using fixed-time windows of one hour, Beam projects the main input window against the side input window set and selects the side input value from the appropriate hour-long side input window.
+
+If the main input element exists in more than one window, then `processElement` gets called multiple times, once for each window. Each call to `processElement` projects the "current" window for the main input element, and thus might provide a different view of the side input each time.
+
+If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.
+
+##### **Side Outputs**
+
+While `ParDo` always produces a main output `PCollection` (as the return value from apply), you can also have your `ParDo` produce any number of additional output `PCollection`s. If you choose to have multiple outputs, your `ParDo` returns all of the output `PCollection`s (including the main output) bundled together.
+
+##### Tags for Side Outputs:
+
+```java
+// To emit elements to a side output PCollection, create a TupleTag object to identify each collection that your ParDo produces.
+// For example, if your ParDo produces three output PCollections (the main output and two side outputs), you must create three TupleTags.
+// The following example code shows how to create TupleTags for a ParDo with a main output and two side outputs:
+
+  // Input PCollection to our ParDo.
+  PCollection<String> words = ...;
+
+  // The ParDo will filter words whose length is below a cutoff and add them to
+  // the main ouput PCollection<String>.
+  // If a word is above the cutoff, the ParDo will add the word length to a side output
+  // PCollection<Integer>.
+  // If a word starts with the string "MARKER", the ParDo will add that word to a different
+  // side output PCollection<String>.
+  final int wordLengthCutOff = 10;
+
+  // Create the TupleTags for the main and side outputs.
+  // Main output.
+  final TupleTag<String> wordsBelowCutOffTag =
+      new TupleTag<String>(){};
+  // Word lengths side output.
+  final TupleTag<Integer> wordLengthsAboveCutOffTag =
+      new TupleTag<Integer>(){};
+  // "MARKER" words side output.
+  final TupleTag<String> markedWordsTag =
+      new TupleTag<String>(){};
+
+// Passing Output Tags to ParDo:
+// After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking .withOutputTags.
+// You pass the tag for the main output first, and then the tags for any side outputs in a TupleTagList.
+// Building on our previous example, we pass the three TupleTags (one for the main output and two for the side outputs) to our ParDo.
+// Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple.
+
+  PCollectionTuple results =
+      words.apply(
+          ParDo
+          // Specify the tag for the main output, wordsBelowCutoffTag.
+          .withOutputTags(wordsBelowCutOffTag,
+          // Specify the tags for the two side outputs as a TupleTagList.
+                          TupleTagList.of(wordLengthsAboveCutOffTag)
+                                      .and(markedWordsTag))
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          }
+```
+
+```python
+# To emit elements to a side output PCollection, invoke with_outputs() on the ParDo, optionally specifying the expected tags for the output.
+# with_outputs() returns a DoOutputsTuple object. Tags specified in with_outputs are attributes on the returned DoOutputsTuple object.
+# The tags give access to the corresponding output PCollections.
+
+results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
+           .with_outputs('above_cutoff_lengths', 'marked strings',
+                         main='below_cutoff_strings'))
+below = results.below_cutoff_strings
+above = results.above_cutoff_lengths
+marked = results['marked strings']  # indexing works as well
+
+# The result is also iterable, ordered in the same order that the tags were passed to with_outputs(), the main tag (if specified) first.
+
+below, above, marked = (words
+                        | beam.ParDo(
+                            ProcessWords(), cutoff_length=2, marker='x')
+                        .with_outputs('above_cutoff_lengths',
+                                      'marked strings',
+                                      main='below_cutoff_strings'))
+```
+
+##### Emitting to Side Outputs in your DoFn:
+
+```java
+// Inside your ParDo's DoFn, you can emit an element to a side output by using the method ProcessContext.sideOutput.
+// Pass the appropriate TupleTag for the target side output collection when you call ProcessContext.sideOutput.
+// After your ParDo, extract the resulting main and side output PCollections from the returned PCollectionTuple.
+// Based on the previous example, this shows the DoFn emitting to the main and side outputs.
+
+  .of(new DoFn<String, String>() {
+     public void processElement(ProcessContext c) {
+       String word = c.element();
+       if (word.length() <= wordLengthCutOff) {
+         // Emit this short word to the main output.
+         c.output(word);
+       } else {
+         // Emit this long word's length to a side output.
+         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
+       }
+       if (word.startsWith("MARKER")) {
+         // Emit this word to a different side output.
+         c.sideOutput(markedWordsTag, word);
+       }
+     }}));
+
+```
+
+```python
+# Inside your ParDo's DoFn, you can emit an element to a side output by wrapping the value and the output tag (str).
+# using the pvalue.SideOutputValue wrapper class.
+# Based on the previous example, this shows the DoFn emitting to the main and side outputs.
+
+class ProcessWords(beam.DoFn):
+
+  def process(self, context, cutoff_length, marker):
+    if len(context.element) <= cutoff_length:
+      # Emit this short word to the main output.
+      yield context.element
+    else:
+      # Emit this word's long length to a side output.
+      yield pvalue.SideOutputValue(
+          'above_cutoff_lengths', len(context.element))
+    if context.element.startswith(marker):
+      # Emit this word to a different side output.
+      yield pvalue.SideOutputValue('marked strings', context.element)
+
+
+# Side outputs are also available in Map and FlatMap.
+# Here is an example that uses FlatMap and shows that the tags do not need to be specified ahead of time.
+
+def even_odd(x):
+  yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
+  if x % 10 == 0:
+    yield x
+
+results = numbers | beam.FlatMap(even_odd).with_outputs()
+
+evens = results.even
+odds = results.odd
+tens = results[None]  # the undeclared main output
+```
+
 <a name="io"></a>
 <a name="running"></a>
 <a name="transforms-composite"></a>
-<a name="transforms-sideio"></a>
 <a name="coders"></a>
 <a name="windowing"></a>
 <a name="triggers"></a>


[3/3] incubator-beam-site git commit: This closes #91

Posted by da...@apache.org.
This closes #91


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/1b458f10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/1b458f10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/1b458f10

Branch: refs/heads/asf-site
Commit: 1b458f10236595a8fe74ec96b25e405e8bda011e
Parents: 1c9f856 24eb912
Author: Davor Bonaci <da...@google.com>
Authored: Wed Nov 23 22:07:33 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 22:07:33 2016 -0800

----------------------------------------------------------------------
 .../documentation/programming-guide/index.html  | 501 ++++++++++++++++++-
 src/documentation/programming-guide.md          | 496 +++++++++++++++++-
 2 files changed, 990 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam-site git commit: Regenerate website

Posted by da...@apache.org.
Regenerate website


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/24eb9127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/24eb9127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/24eb9127

Branch: refs/heads/asf-site
Commit: 24eb91271785ff16d09c3b69959c7c6f8a9d7e20
Parents: 3627a44
Author: Davor Bonaci <da...@google.com>
Authored: Wed Nov 23 22:07:33 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 22:07:33 2016 -0800

----------------------------------------------------------------------
 .../documentation/programming-guide/index.html  | 501 ++++++++++++++++++-
 1 file changed, 498 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/24eb9127/content/documentation/programming-guide/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html
index d941e42..aa97bb6 100644
--- a/content/documentation/programming-guide/index.html
+++ b/content/documentation/programming-guide/index.html
@@ -172,6 +172,7 @@
       <li><a href="#transforms-pardo">Using ParDo</a></li>
       <li><a href="#transforms-gbk">Using GroupByKey</a></li>
       <li><a href="#transforms-combine">Using Combine</a></li>
+      <li><a href="#transforms-flatten-partition">Using Flatten and Partition</a></li>
       <li><a href="#transforms-usercodereqs">General Requirements for Writing User Code for Beam Transforms</a></li>
       <li><a href="#transforms-sideio">Side Inputs and Side Outputs</a></li>
     </ul>
@@ -364,7 +365,7 @@
 </code></pre>
 </div>
 
-<p>The resulting workflow graph from the branching pipeline abouve looks like this:</p>
+<p>The resulting workflow graph from the branching pipeline above looks like this:</p>
 
 <p>[Branching Graph Graphic]</p>
 
@@ -382,7 +383,7 @@
   <li><code class="highlighter-rouge">ParDo</code></li>
   <li><code class="highlighter-rouge">GroupByKey</code></li>
   <li><code class="highlighter-rouge">Combine</code></li>
-  <li><code class="highlighter-rouge">Flatten</code></li>
+  <li><code class="highlighter-rouge">Flatten</code> and <code class="highlighter-rouge">Partition</code></li>
 </ul>
 
 <h4 id="a-nametransforms-pardoapardo"><a name="transforms-pardo"></a>ParDo</h4>
@@ -552,6 +553,270 @@ tree, [2]
 
 <h4 id="a-nametransforms-combineausing-combine"><a name="transforms-combine"></a>Using Combine</h4>
 
+<p><span class="language-java"><a href="/documentation/sdks/javadoc/0.3.0-incubating/index.html?org/apache/beam/sdk/transforms/Combine.html"><code class="highlighter-rouge">Combine</code></a></span><span class="language-python"><a href="https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py"><code class="highlighter-rouge">Combine</code></a></span> is a Beam transform for combining collections of elements or values in your data. <code class="highlighter-rouge">Combine</code> has variants that work on entire <code class="highlighter-rouge">PCollection</code>s, and some that combine the values for each key in <code class="highlighter-rouge">PCollection</code>s of key/value pairs.</p>
+
+<p>When you apply a <code class="highlighter-rouge">Combine</code> transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and max.</p>
+
+<p>Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of <code class="highlighter-rouge">CombineFn</code> that has an accumulation type distinct from the input/output type.</p>
+
+<h5 id="simple-combinations-using-simple-functions"><strong>Simple Combinations Using Simple Functions</strong></h5>
+
+<p>The following example code shows a simple combine function.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.</span>
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">SumInts</span> <span class="kd">implements</span> <span class="n">SerializableFunction</span><span class="o">&lt;</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span>
+    <span class="kt">int</span> <span class="n">sum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+    <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">item</span> <span class="o">:</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">sum</span> <span class="o">+=</span> <span class="n">item</span><span class="o">;</span>
+    <span class="o">}</span>
+    <span class="k">return</span> <span class="n">sum</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="c"># A bounded sum of positive integers.</span>
+<span class="k">def</span> <span class="nf">bounded_sum</span><span class="p">(</span><span class="n">values</span><span class="p">,</span> <span class="n">bound</span><span class="o">=</span><span class="mi">500</span><span class="p">):</span>
+  <span class="k">return</span> <span class="nb">min</span><span class="p">(</span><span class="nb">sum</span><span class="p">(</span><span class="n">values</span><span class="p">),</span> <span class="n">bound</span><span class="p">)</span>
+</code></pre>
+</div>
+
+<h5 id="advanced-combinations-using-combinefn"><strong>Advanced Combinations using CombineFn</strong></h5>
+
+<p>For more complex combine functions, you can define a subclass of <code class="highlighter-rouge">CombineFn</code>. You should use <code class="highlighter-rouge">CombineFn</code> if the combine function requires a more sophisticated accumulator, must perform additional pre- or post-processing, might change the output type, or takes the key into account.</p>
+
+<p>A general combining operation consists of four operations. When you create a subclass of <code class="highlighter-rouge">CombineFn</code>, you must provide four operations by overriding the corresponding methods:</p>
+
+<ol>
+  <li>
+    <p><strong>Create Accumulator</strong> creates a new \u201clocal\u201d accumulator. In the example case, taking a mean average, a local accumulator tracks the running sum of values (the numerator value for our final average division) and the number of values summed so far (the denominator value). It may be called any number of times in a distributed fashion.</p>
+  </li>
+  <li>
+    <p><strong>Add Input</strong> adds an input element to an accumulator, returning the accumulator value. In our example, it would update the sum and increment the count. It may also be invoked in parallel.</p>
+  </li>
+  <li>
+    <p><strong>Merge Accumulators</strong> merges several accumulators into a single accumulator; this is how data in multiple accumulators is combined before the final calculation. In the case of the mean average computation, the accumulators representing each portion of the division are merged together. It may be called again on its outputs any number of times.</p>
+  </li>
+  <li>
+    <p><strong>Extract Output</strong> performs the final computation. In the case of computing a mean average, this means dividing the combined sum of all the values by the number of values summed. It is called once on the final, merged accumulator.</p>
+  </li>
+</ol>
+
+<p>The following example code shows how to define a <code class="highlighter-rouge">CombineFn</code> that computes a mean average:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">class</span> <span class="nc">AverageFn</span> <span class="kd">extends</span> <span class="n">CombineFn</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">AverageFn</span><span class="o">.</span><span class="na">Accum</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Accum</span> <span class="o">{</span>
+    <span class="kt">int</span> <span class="n">sum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+    <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Accum</span> <span class="nf">createAccumulator</span><span class="o">()</span> <span class="o">{</span> <span class="k">return</span> <span class="k">new</span> <span class="n">Accum</span><span class="o">();</span> <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Accum</span> <span class="nf">addInput</span><span class="o">(</span><span class="n">Accum</span> <span class="n">accum</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">accum</span><span class="o">.</span><span class="na">sum</span> <span class="o">+=</span> <span class="n">input</span><span class="o">;</span>
+      <span class="n">accum</span><span class="o">.</span><span class="na">count</span><span class="o">++;</span>
+      <span class="k">return</span> <span class="n">accum</span><span class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Accum</span> <span class="nf">mergeAccumulators</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Accum</span><span class="o">&gt;</span> <span class="n">accums</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">Accum</span> <span class="n">merged</span> <span class="o">=</span> <span class="n">createAccumulator</span><span class="o">();</span>
+    <span class="k">for</span> <span class="o">(</span><span class="n">Accum</span> <span class="n">accum</span> <span class="o">:</span> <span class="n">accums</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">merged</span><span class="o">.</span><span class="na">sum</span> <span class="o">+=</span> <span class="n">accum</span><span class="o">.</span><span class="na">sum</span><span class="o">;</span>
+      <span class="n">merged</span><span class="o">.</span><span class="na">count</span> <span class="o">+=</span> <span class="n">accum</span><span class="o">.</span><span class="na">count</span><span class="o">;</span>
+    <span class="o">}</span>
+    <span class="k">return</span> <span class="n">merged</span><span class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Double</span> <span class="nf">extractOutput</span><span class="o">(</span><span class="n">Accum</span> <span class="n">accum</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="o">((</span><span class="kt">double</span><span class="o">)</span> <span class="n">accum</span><span class="o">.</span><span class="na">sum</span><span class="o">)</span> <span class="o">/</span> <span class="n">accum</span><span class="o">.</span><span class="na">count</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="n">pc</span> <span class="o">=</span> <span class="o">...</span>
+<span class="k">class</span> <span class="nc">AverageFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">CombineFn</span><span class="p">):</span>
+  <span class="k">def</span> <span class="nf">create_accumulator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+    <span class="k">return</span> <span class="p">(</span><span class="mf">0.0</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span>
+
+  <span class="k">def</span> <span class="nf">add_input</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="n">count</span><span class="p">),</span> <span class="nb">input</span><span class="p">):</span>
+    <span class="k">return</span> <span class="nb">sum</span> <span class="o">+</span> <span class="nb">input</span><span class="p">,</span> <span class="n">count</span> <span class="o">+</span> <span class="mi">1</span>
+
+  <span class="k">def</span> <span class="nf">merge_accumulators</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulators</span><span class="p">):</span>
+    <span class="n">sums</span><span class="p">,</span> <span class="n">counts</span> <span class="o">=</span> <span class="nb">zip</span><span class="p">(</span><span class="o">*</span><span class="n">accumulators</span><span class="p">)</span>
+    <span class="k">return</span> <span class="nb">sum</span><span class="p">(</span><span class="n">sums</span><span class="p">),</span> <span class="nb">sum</span><span class="p">(</span><span class="n">counts</span><span class="p">)</span>
+
+  <span class="k">def</span> <span class="nf">extract_output</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="n">count</span><span class="p">)):</span>
+    <span class="k">return</span> <span class="nb">sum</span> <span class="o">/</span> <span class="n">count</span> <span class="k">if</span> <span class="n">count</span> <span class="k">else</span> <span class="nb">float</span><span class="p">(</span><span class="s">'NaN'</span><span class="p">)</span>
+</code></pre>
+</div>
+
+<p>If you are combining a <code class="highlighter-rouge">PCollection</code> of key-value pairs, <a href="#transforms-combine-per-key">per-key combining</a> is often enough. If you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a <code class="highlighter-rouge">KeyedCombineFn</code> to access the key within the combining strategy.</p>
+
+<h5 id="combining-a-pcollection-into-a-single-value"><strong>Combining a PCollection into a Single Value</strong></h5>
+
+<p>Use the global combine to transform all of the elements in a given <code class="highlighter-rouge">PCollection</code> into a single value, represented in your pipeline as a new <code class="highlighter-rouge">PCollection</code> containing one element. The following example code shows how to apply the Beam provided sum combine function to produce a single sum value for a <code class="highlighter-rouge">PCollection</code> of integers.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Sum.SumIntegerFn() combines the elements in the input PCollection.</span>
+<span class="c1">// The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">sum</span> <span class="o">=</span> <span class="n">pc</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
+   <span class="n">Combine</span><span class="o">.</span><span class="na">globally</span><span class="o">(</span><span class="k">new</span> <span class="n">Sum</span><span class="o">.</span><span class="na">SumIntegerFn</span><span class="o">()));</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="c"># sum combines the elements in the input PCollection.</span>
+<span class="c"># The resulting PCollection, called result, contains one value: the sum of all the elements in the input PCollection.</span>
+<span class="n">pc</span> <span class="o">=</span> <span class="o">...</span>
+<span class="n">result</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="nb">sum</span><span class="p">)</span>
+</code></pre>
+</div>
+
+<h5 id="global-windowing">Global Windowing:</h5>
+
+<p>If your input <code class="highlighter-rouge">PCollection</code> uses the default global windowing, the default behavior is to return a <code class="highlighter-rouge">PCollection</code> containing one item. That item\u2019s value comes from the accumulator in the combine function that you specified when applying <code class="highlighter-rouge">Combine</code>. For example, the Beam provided sum combine function returns a zero value (the sum of an empty input), while the min combine function returns a maximal or infinite value.</p>
+
+<p>To have <code class="highlighter-rouge">Combine</code> instead return an empty <code class="highlighter-rouge">PCollection</code> if the input is empty, specify <code class="highlighter-rouge">.withoutDefaults</code> when you apply your <code class="highlighter-rouge">Combine</code> transform, as in the following code example:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">sum</span> <span class="o">=</span> <span class="n">pc</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
+  <span class="n">Combine</span><span class="o">.</span><span class="na">globally</span><span class="o">(</span><span class="k">new</span> <span class="n">Sum</span><span class="o">.</span><span class="na">SumIntegerFn</span><span class="o">()).</span><span class="na">withoutDefaults</span><span class="o">());</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="n">pc</span> <span class="o">=</span> <span class="o">...</span>
+<span class="nb">sum</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="nb">sum</span><span class="p">)</span><span class="o">.</span><span class="n">without_defaults</span><span class="p">()</span>
+
+</code></pre>
+</div>
+
+<h5 id="non-global-windowing">Non-Global Windowing:</h5>
+
+<p>If your <code class="highlighter-rouge">PCollection</code> uses any non-global windowing function, Beam does not provide the default behavior. You must specify one of the following options when applying <code class="highlighter-rouge">Combine</code>:</p>
+
+<ul>
+  <li>Specify <code class="highlighter-rouge">.withoutDefaults</code>, where windows that are empty in the input <code class="highlighter-rouge">PCollection</code> will likewise be empty in the output collection.</li>
+  <li>Specify <code class="highlighter-rouge">.asSingletonView</code>, in which the output is immediately converted to a <code class="highlighter-rouge">PCollectionView</code>, which will provide a default value for each empty window when used as a side input. You\u2019ll generally only need to use this option if the result of your pipeline\u2019s <code class="highlighter-rouge">Combine</code> is to be used as a side input later in the pipeline.</li>
+</ul>
+
+<h5 id="a-nametransforms-combine-per-keyacombining-values-in-a-key-grouped-collection"><a name="transforms-combine-per-key"></a><strong>Combining Values in a Key-Grouped Collection</strong></h5>
+
+<p>After creating a key-grouped collection (for example, by using a <code class="highlighter-rouge">GroupByKey</code> transform) a common pattern is to combine the collection of values associated with each key into a single, merged value. Drawing on the previous example from <code class="highlighter-rouge">GroupByKey</code>, a key-grouped <code class="highlighter-rouge">PCollection</code> called <code class="highlighter-rouge">groupedWords</code> looks like this:</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>  cat, [1,5,9]
+  dog, [5,2]
+  and, [1,2,6]
+  jump, [3]
+  tree, [2]
+  ...
+</code></pre>
+</div>
+
+<p>In the above <code class="highlighter-rouge">PCollection</code>, each element has a string key (for example, \u201ccat\u201d) and an iterable of integers for its value (in the first element, containing [1, 5, 9]). If our pipeline\u2019s next processing step combines the values (rather than considering them individually), you can combine the iterable of integers to create a single, merged value to be paired with each key. This pattern of a <code class="highlighter-rouge">GroupByKey</code> followed by merging the collection of values is equivalent to Beam\u2019s Combine PerKey transform. The combine function you supply to Combine PerKey must be an associative reduction function or a subclass of <code class="highlighter-rouge">CombineFn</code>.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// PCollection is grouped by key and the Double values associated with each key are combined into a Double.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">salesRecords</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">totalSalesPerPerson</span> <span class="o">=</span>
+  <span class="n">salesRecords</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span><span class="n">perKey</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">Sum</span><span class="o">.</span><span class="na">SumDoubleFn</span><span class="o">()));</span>
+
+<span class="c1">// The combined value is of a different type than the original collection of values per key.</span>
+<span class="c1">// PCollection has keys of type String and values of type Integer, and the combined value is a Double.</span>
+
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">playerAccuracy</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">avgAccuracyPerPlayer</span> <span class="o">=</span>
+  <span class="n">playerAccuracy</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span><span class="n">perKey</span><span class="o">(</span>
+    <span class="k">new</span> <span class="nf">MeanInts</span><span class="o">())));</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="c"># PCollection is grouped by key and the numeric values associated with each key are averaged into a float.</span>
+<span class="n">player_accuracies</span> <span class="o">=</span> <span class="o">...</span>
+<span class="n">avg_accuracy_per_player</span> <span class="o">=</span> <span class="p">(</span><span class="n">player_accuracies</span>
+                           <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombinePerKey</span><span class="p">(</span>
+                               <span class="n">beam</span><span class="o">.</span><span class="n">combiners</span><span class="o">.</span><span class="n">MeanCombineFn</span><span class="p">()))</span>
+</code></pre>
+</div>
+
+<h4 id="a-nametransforms-flatten-partitionausing-flatten-and-partition"><a name="transforms-flatten-partition"></a>Using Flatten and Partition</h4>
+
+<p><span class="language-java"><a href="/documentation/sdks/javadoc/0.3.0-incubating/index.html?org/apache/beam/sdk/transforms/Flatten.html"><code class="highlighter-rouge">Flatten</code></a></span><span class="language-python"><a href="https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py"><code class="highlighter-rouge">Flatten</code></a></span> and <span class="language-java"><a href="/documentation/sdks/javadoc/0.3.0-incubating/index.html?org/apache/beam/sdk/transforms/Partition.html"><code class="highlighter-rouge">Partition</code></a></span><span class="language-python"><a href="https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/transforms/core.py"><code class="highlighter-rouge">Partition</code></a></span> are Beam transforms for <code class="highlighter-rouge">PCollection</code> objects that store the same data type. <code class="highlighter-rouge">Flatten</code> merges multiple <code class="highligh
 ter-rouge">PCollection</code> objects into a single logical <code class="highlighter-rouge">PCollection</code>, and <code class="highlighter-rouge">Partition</code> splits a single <code class="highlighter-rouge">PCollection</code> into a fixed number of smaller collections.</p>
+
+<h5 id="flatten"><strong>Flatten</strong></h5>
+
+<p>The following example shows how to apply a <code class="highlighter-rouge">Flatten</code> transform to merge multiple <code class="highlighter-rouge">PCollection</code> objects.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Flatten takes a PCollectionList of PCollection objects of a given type.</span>
+<span class="c1">// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">pc1</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">pc2</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">pc3</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">PCollectionList</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">collections</span> <span class="o">=</span> <span class="n">PCollectionList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">pc1</span><span class="o">).</span><span class="na">and</span><span class="o">(</span><span class="n">pc2</span><span class="o">).</span><span class="na">and</span><span class="o">(</span><span class="n">pc3</span><span class="o">);</span>
+
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">merged</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Flatten</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">pCollections</span><span class="o">());</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="c"># Flatten takes a tuple of PCollection objects.</span>
+<span class="c"># Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.</span>
+<span class="n">merged</span> <span class="o">=</span> <span class="p">(</span>
+    <span class="p">(</span><span class="n">pcoll1</span><span class="p">,</span> <span class="n">pcoll2</span><span class="p">,</span> <span class="n">pcoll3</span><span class="p">)</span>
+    <span class="c"># A list of tuples can be "piped" directly into a Flatten transform.</span>
+    <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span>
+</code></pre>
+</div>
+
+<h5 id="data-encoding-in-merged-collections">Data Encoding in Merged Collections:</h5>
+
+<p>By default, the coder for the output <code class="highlighter-rouge">PCollection</code> is the same as the coder for the first <code class="highlighter-rouge">PCollection</code> in the input <code class="highlighter-rouge">PCollectionList</code>. However, the input <code class="highlighter-rouge">PCollection</code> objects can each use different coders, as long as they all contain the same data type in your chosen language.</p>
+
+<h5 id="merging-windowed-collections">Merging Windowed Collections:</h5>
+
+<p>When using <code class="highlighter-rouge">Flatten</code> to merge <code class="highlighter-rouge">PCollection</code> objects that have a windowing strategy applied, all of the <code class="highlighter-rouge">PCollection</code> objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you\u2019re merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.</p>
+
+<p>If your pipeline attempts to use <code class="highlighter-rouge">Flatten</code> to merge <code class="highlighter-rouge">PCollection</code> objects with incompatible windows, Beam generates an <code class="highlighter-rouge">IllegalStateException</code> error when your pipeline is constructed.</p>
+
+<h5 id="partition"><strong>Partition</strong></h5>
+
+<p><code class="highlighter-rouge">Partition</code> divides the elements of a <code class="highlighter-rouge">PCollection</code> according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input <code class="highlighter-rouge">PCollection</code> into each resulting partition <code class="highlighter-rouge">PCollection</code>. The number of partitions must be determined at graph construction time. You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).</p>
+
+<p>The following example divides a <code class="highlighter-rouge">PCollection</code> into percentile groups.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.</span>
+<span class="c1">// In this example, we define the PartitionFn in-line.</span>
+<span class="c1">// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Student</span><span class="o">&gt;</span> <span class="n">students</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="c1">// Split students up into 10 partitions, by percentile:</span>
+<span class="n">PCollectionList</span><span class="o">&lt;</span><span class="n">Student</span><span class="o">&gt;</span> <span class="n">studentsByPercentile</span> <span class="o">=</span>
+    <span class="n">students</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Partition</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="k">new</span> <span class="n">PartitionFn</span><span class="o">&lt;</span><span class="n">Student</span><span class="o">&gt;()</span> <span class="o">{</span>
+        <span class="kd">public</span> <span class="kt">int</span> <span class="nf">partitionFor</span><span class="o">(</span><span class="n">Student</span> <span class="n">student</span><span class="o">,</span> <span class="kt">int</span> <span class="n">numPartitions</span><span class="o">)</span> <span class="o">{</span>
+            <span class="k">return</span> <span class="n">student</span><span class="o">.</span><span class="na">getPercentile</span><span class="o">()</span>  <span class="c1">// 0..99</span>
+                 <span class="o">*</span> <span class="n">numPartitions</span> <span class="o">/</span> <span class="mi">100</span><span class="o">;</span>
+        <span class="o">}}));</span>
+
+<span class="c1">// You can extract each partition from the PCollectionList using the get method, as follows:</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Student</span><span class="o">&gt;</span> <span class="n">fortiethPercentile</span> <span class="o">=</span> <span class="n">studentsByPercentile</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="mi">4</span><span class="o">);</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="c"># Provide an int value with the desired number of result partitions, and a partitioning function (partition_fn in this example).</span>
+<span class="c"># Returns a tuple of PCollection objects containing each of the resulting partitions as individual PCollection objects.</span>
+<span class="k">def</span> <span class="nf">partition_fn</span><span class="p">(</span><span class="n">student</span><span class="p">,</span> <span class="n">num_partitions</span><span class="p">):</span>
+  <span class="k">return</span> <span class="nb">int</span><span class="p">(</span><span class="n">get_percentile</span><span class="p">(</span><span class="n">student</span><span class="p">)</span> <span class="o">*</span> <span class="n">num_partitions</span> <span class="o">/</span> <span class="mi">100</span><span class="p">)</span>
+
+<span class="n">by_decile</span> <span class="o">=</span> <span class="n">students</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Partition</span><span class="p">(</span><span class="n">partition_fn</span><span class="p">,</span> <span class="mi">10</span><span class="p">)</span>
+
+<span class="c"># You can extract each partition from the tuple of PCollection objects as follows:</span>
+<span class="n">fortieth_percentile</span> <span class="o">=</span> <span class="n">by_decile</span><span class="p">[</span><span class="mi">4</span><span class="p">]</span>
+</code></pre>
+</div>
+
 <h4 id="a-nametransforms-usercodereqsageneral-requirements-for-writing-user-code-for-beam-transforms"><a name="transforms-usercodereqs"></a>General Requirements for Writing User Code for Beam Transforms</h4>
 
 <p>When you build user code for a Beam transform, you should keep in mind the distributed nature of execution. For example, there might be many copies of your function running on a lot of different machines in parallel, and those copies function independently, without communicating or sharing state with any of the other copies. Depending on the Pipeline Runner and processing back-end you choose for your pipeline, each copy of your user code function may be retried or run multiple times. As such, you should be cautious about including things like state dependency in your user code.</p>
@@ -591,10 +856,240 @@ tree, [2]
 
 <p>It\u2019s recommended that you make your function object idempotent\u2013that is, that it can be repeated or retried as often as necessary without causing unintended side effects. The Beam model provides no guarantees as to the number of times your user code might be invoked or retried; as such, keeping your function object idempotent keeps your pipeline\u2019s output deterministic, and your transforms\u2019 behavior more predictable and easier to debug.</p>
 
+<h4 id="a-nametransforms-sideioaside-inputs-and-side-outputs"><a name="transforms-sideio"></a>Side Inputs and Side Outputs</h4>
+
+<h5 id="side-inputs"><strong>Side Inputs</strong></h5>
+
+<p>In addition to the main input <code class="highlighter-rouge">PCollection</code>, you can provide additional inputs to a <code class="highlighter-rouge">ParDo</code> transform in the form of side inputs. A side input is an additional input that your <code class="highlighter-rouge">DoFn</code> can access each time it processes an element in the input <code class="highlighter-rouge">PCollection</code>. When you specify a side input, you create a view of some other data that can be read from within the <code class="highlighter-rouge">ParDo</code> transform\u2019s <code class="highlighter-rouge">DoFn</code> while procesing each element.</p>
+
+<p>Side inputs are useful if your <code class="highlighter-rouge">ParDo</code> needs to inject additional data when processing each element in the input <code class="highlighter-rouge">PCollection</code>, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.</p>
+
+<h5 id="passing-side-inputs-to-pardo">Passing Side Inputs to ParDo:</h5>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="c1">// Pass side inputs to your ParDo transform by invoking .withSideInputs.</span>
+  <span class="c1">// Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.</span>
+
+  <span class="c1">// The input PCollection to ParDo.</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...;</span>
+
+  <span class="c1">// A PCollection of word lengths that we'll combine into a single value.</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">wordLengths</span> <span class="o">=</span> <span class="o">...;</span> <span class="c1">// Singleton PCollection</span>
+
+  <span class="c1">// Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.</span>
+  <span class="kd">final</span> <span class="n">PCollectionView</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">maxWordLengthCutOffView</span> <span class="o">=</span>
+     <span class="n">wordLengths</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.</span><span class="na">globally</span><span class="o">(</span><span class="k">new</span> <span class="n">Max</span><span class="o">.</span><span class="na">MaxIntFn</span><span class="o">()).</span><span class="na">asSingletonView</span><span class="o">());</span>
+
+
+  <span class="c1">// Apply a ParDo that takes maxWordLengthCutOffView as a side input.</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">wordsBelowCutOff</span> <span class="o">=</span>
+  <span class="n">words</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">ParDo</span><span class="o">.</span><span class="na">withSideInputs</span><span class="o">(</span><span class="n">maxWordLengthCutOffView</span><span class="o">)</span>
+                    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">String</span> <span class="n">word</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">();</span>
+        <span class="c1">// In our DoFn, access the side input.</span>
+        <span class="kt">int</span> <span class="n">lengthCutOff</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">sideInput</span><span class="o">(</span><span class="n">maxWordLengthCutOffView</span><span class="o">);</span>
+        <span class="k">if</span> <span class="o">(</span><span class="n">word</span><span class="o">.</span><span class="na">length</span><span class="o">()</span> <span class="o">&lt;=</span> <span class="n">lengthCutOff</span><span class="o">)</span> <span class="o">{</span>
+          <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">word</span><span class="o">);</span>
+        <span class="o">}</span>
+  <span class="o">}}));</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="c"># Side inputs are available as extra arguments in the DoFn's process method or Map / FlatMap's callable.</span>
+<span class="c"># Optional, positional, and keyword arguments are all supported. Deferred arguments are unwrapped into their actual values.</span>
+<span class="c"># For example, using pvalue.AsIter(pcoll) at pipeline construction time results in an iterable of the actual elements of pcoll being passed into each process invocation.</span>
+<span class="c"># In this example, side inputs are passed to a FlatMap transform as extra arguments and consumed by filter_using_length.</span>
+
+<span class="c"># Callable takes additional arguments.</span>
+<span class="k">def</span> <span class="nf">filter_using_length</span><span class="p">(</span><span class="n">word</span><span class="p">,</span> <span class="n">lower_bound</span><span class="p">,</span> <span class="n">upper_bound</span><span class="o">=</span><span class="nb">float</span><span class="p">(</span><span class="s">'inf'</span><span class="p">)):</span>
+  <span class="k">if</span> <span class="n">lower_bound</span> <span class="o">&lt;=</span> <span class="nb">len</span><span class="p">(</span><span class="n">word</span><span class="p">)</span> <span class="o">&lt;=</span> <span class="n">upper_bound</span><span class="p">:</span>
+    <span class="k">yield</span> <span class="n">word</span>
+
+<span class="c"># Construct a deferred side input.</span>
+<span class="n">avg_word_len</span> <span class="o">=</span> <span class="p">(</span><span class="n">words</span>
+                <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="nb">len</span><span class="p">)</span>
+                <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">combiners</span><span class="o">.</span><span class="n">MeanCombineFn</span><span class="p">()))</span>
+
+<span class="c"># Call with explicit side inputs.</span>
+<span class="n">small_words</span> <span class="o">=</span> <span class="n">words</span> <span class="o">|</span> <span class="s">'small'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span class="n">filter_using_length</span><span class="p">,</span> <span class="mi">0</span><span class="p">,</span> <span class="mi">3</span><span class="p">)</span>
+
+<span class="c"># A single deferred side input.</span>
+<span class="n">larger_than_average</span> <span class="o">=</span> <span class="p">(</span><span class="n">words</span> <span class="o">|</span> <span class="s">'large'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span>
+    <span class="n">filter_using_length</span><span class="p">,</span>
+    <span class="n">lower_bound</span><span class="o">=</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span><span class="n">avg_word_len</span><span class="p">)))</span>
+
+<span class="c"># Mix and match.</span>
+<span class="n">small_but_nontrivial</span> <span class="o">=</span> <span class="n">words</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span class="n">filter_using_length</span><span class="p">,</span>
+                                            <span class="n">lower_bound</span><span class="o">=</span><span class="mi">2</span><span class="p">,</span>
+                                            <span class="n">upper_bound</span><span class="o">=</span><span class="n">pvalue</span><span class="o">.</span><span class="n">AsSingleton</span><span class="p">(</span>
+                                                <span class="n">avg_word_len</span><span class="p">))</span>
+
+
+<span class="c"># We can also pass side inputs to a ParDo transform, which will get passed to its process method.</span>
+<span class="c"># The only change is that the first arguments are self and a context, rather than the PCollection element itself.</span>
+
+<span class="k">class</span> <span class="nc">FilterUsingLength</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
+  <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">lower_bound</span><span class="p">,</span> <span class="n">upper_bound</span><span class="o">=</span><span class="nb">float</span><span class="p">(</span><span class="s">'inf'</span><span class="p">)):</span>
+    <span class="k">if</span> <span class="n">lower_bound</span> <span class="o">&lt;=</span> <span class="nb">len</span><span class="p">(</span><span class="n">context</span><span class="o">.</span><span class="n">element</span><span class="p">)</span> <span class="o">&lt;=</span> <span class="n">upper_bound</span><span class="p">:</span>
+      <span class="k">yield</span> <span class="n">context</span><span class="o">.</span><span class="n">element</span>
+
+<span class="n">small_words</span> <span class="o">=</span> <span class="n">words</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">FilterUsingLength</span><span class="p">(),</span> <span class="mi">0</span><span class="p">,</span> <span class="mi">3</span><span class="p">)</span>
+<span class="o">...</span>
+
+</code></pre>
+</div>
+
+<h5 id="side-inputs-and-windowing">Side Inputs and Windowing:</h5>
+
+<p>A windowed <code class="highlighter-rouge">PCollection</code> may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a <code class="highlighter-rouge">PCollectionView</code> of a windowed <code class="highlighter-rouge">PCollection</code>, the <code class="highlighter-rouge">PCollectionView</code> represents a single entity per window (one singleton per window, one list per window, etc.).</p>
+
+<p>Beam uses the window(s) for the main input element to look up the appropriate window for the side input element. Beam projects the main input element\u2019s window into the side input\u2019s window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window. However, if the inputs have different windows, Beam uses the projection to choose the most appropriate side input window.</p>
+
+<p>For example, if the main input is windowed using fixed-time windows of one minute, and the side input is windowed using fixed-time windows of one hour, Beam projects the main input window against the side input window set and selects the side input value from the appropriate hour-long side input window.</p>
+
+<p>If the main input element exists in more than one window, then <code class="highlighter-rouge">processElement</code> gets called multiple times, once for each window. Each call to <code class="highlighter-rouge">processElement</code> projects the \u201ccurrent\u201d window for the main input element, and thus might provide a different view of the side input each time.</p>
+
+<p>If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.</p>
+
+<h5 id="side-outputs"><strong>Side Outputs</strong></h5>
+
+<p>While <code class="highlighter-rouge">ParDo</code> always produces a main output <code class="highlighter-rouge">PCollection</code> (as the return value from apply), you can also have your <code class="highlighter-rouge">ParDo</code> produce any number of additional output <code class="highlighter-rouge">PCollection</code>s. If you choose to have multiple outputs, your <code class="highlighter-rouge">ParDo</code> returns all of the output <code class="highlighter-rouge">PCollection</code>s (including the main output) bundled together.</p>
+
+<h5 id="tags-for-side-outputs">Tags for Side Outputs:</h5>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// To emit elements to a side output PCollection, create a TupleTag object to identify each collection that your ParDo produces.</span>
+<span class="c1">// For example, if your ParDo produces three output PCollections (the main output and two side outputs), you must create three TupleTags.</span>
+<span class="c1">// The following example code shows how to create TupleTags for a ParDo with a main output and two side outputs:</span>
+
+  <span class="c1">// Input PCollection to our ParDo.</span>
+  <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...;</span>
+
+  <span class="c1">// The ParDo will filter words whose length is below a cutoff and add them to</span>
+  <span class="c1">// the main ouput PCollection&lt;String&gt;.</span>
+  <span class="c1">// If a word is above the cutoff, the ParDo will add the word length to a side output</span>
+  <span class="c1">// PCollection&lt;Integer&gt;.</span>
+  <span class="c1">// If a word starts with the string "MARKER", the ParDo will add that word to a different</span>
+  <span class="c1">// side output PCollection&lt;String&gt;.</span>
+  <span class="kd">final</span> <span class="kt">int</span> <span class="n">wordLengthCutOff</span> <span class="o">=</span> <span class="mi">10</span><span class="o">;</span>
+
+  <span class="c1">// Create the TupleTags for the main and side outputs.</span>
+  <span class="c1">// Main output.</span>
+  <span class="kd">final</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">wordsBelowCutOffTag</span> <span class="o">=</span>
+      <span class="k">new</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(){};</span>
+  <span class="c1">// Word lengths side output.</span>
+  <span class="kd">final</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">wordLengthsAboveCutOffTag</span> <span class="o">=</span>
+      <span class="k">new</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;(){};</span>
+  <span class="c1">// "MARKER" words side output.</span>
+  <span class="kd">final</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">markedWordsTag</span> <span class="o">=</span>
+      <span class="k">new</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(){};</span>
+
+<span class="c1">// Passing Output Tags to ParDo:</span>
+<span class="c1">// After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking .withOutputTags.</span>
+<span class="c1">// You pass the tag for the main output first, and then the tags for any side outputs in a TupleTagList.</span>
+<span class="c1">// Building on our previous example, we pass the three TupleTags (one for the main output and two for the side outputs) to our ParDo.</span>
+<span class="c1">// Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple.</span>
+
+  <span class="n">PCollectionTuple</span> <span class="n">results</span> <span class="o">=</span>
+      <span class="n">words</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
+          <span class="n">ParDo</span>
+          <span class="c1">// Specify the tag for the main output, wordsBelowCutoffTag.</span>
+          <span class="o">.</span><span class="na">withOutputTags</span><span class="o">(</span><span class="n">wordsBelowCutOffTag</span><span class="o">,</span>
+          <span class="c1">// Specify the tags for the two side outputs as a TupleTagList.</span>
+                          <span class="n">TupleTagList</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">wordLengthsAboveCutOffTag</span><span class="o">)</span>
+                                      <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">markedWordsTag</span><span class="o">))</span>
+          <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+            <span class="c1">// DoFn continues here.</span>
+            <span class="o">...</span>
+          <span class="o">}</span>
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="c"># To emit elements to a side output PCollection, invoke with_outputs() on the ParDo, optionally specifying the expected tags for the output.</span>
+<span class="c"># with_outputs() returns a DoOutputsTuple object. Tags specified in with_outputs are attributes on the returned DoOutputsTuple object.</span>
+<span class="c"># The tags give access to the corresponding output PCollections.</span>
+
+<span class="n">results</span> <span class="o">=</span> <span class="p">(</span><span class="n">words</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">ProcessWords</span><span class="p">(),</span> <span class="n">cutoff_length</span><span class="o">=</span><span class="mi">2</span><span class="p">,</span> <span class="n">marker</span><span class="o">=</span><span class="s">'x'</span><span class="p">)</span>
+           <span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span><span class="s">'above_cutoff_lengths'</span><span class="p">,</span> <span class="s">'marked strings'</span><span class="p">,</span>
+                         <span class="n">main</span><span class="o">=</span><span class="s">'below_cutoff_strings'</span><span class="p">))</span>
+<span class="n">below</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="n">below_cutoff_strings</span>
+<span class="n">above</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="n">above_cutoff_lengths</span>
+<span class="n">marked</span> <span class="o">=</span> <span class="n">results</span><span class="p">[</span><span class="s">'marked strings'</span><span class="p">]</span>  <span class="c"># indexing works as well</span>
+
+<span class="c"># The result is also iterable, ordered in the same order that the tags were passed to with_outputs(), the main tag (if specified) first.</span>
+
+<span class="n">below</span><span class="p">,</span> <span class="n">above</span><span class="p">,</span> <span class="n">marked</span> <span class="o">=</span> <span class="p">(</span><span class="n">words</span>
+                        <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
+                            <span class="n">ProcessWords</span><span class="p">(),</span> <span class="n">cutoff_length</span><span class="o">=</span><span class="mi">2</span><span class="p">,</span> <span class="n">marker</span><span class="o">=</span><span class="s">'x'</span><span class="p">)</span>
+                        <span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span><span class="s">'above_cutoff_lengths'</span><span class="p">,</span>
+                                      <span class="s">'marked strings'</span><span class="p">,</span>
+                                      <span class="n">main</span><span class="o">=</span><span class="s">'below_cutoff_strings'</span><span class="p">))</span>
+</code></pre>
+</div>
+
+<h5 id="emitting-to-side-outputs-in-your-dofn">Emitting to Side Outputs in your DoFn:</h5>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Inside your ParDo's DoFn, you can emit an element to a side output by using the method ProcessContext.sideOutput.</span>
+<span class="c1">// Pass the appropriate TupleTag for the target side output collection when you call ProcessContext.sideOutput.</span>
+<span class="c1">// After your ParDo, extract the resulting main and side output PCollections from the returned PCollectionTuple.</span>
+<span class="c1">// Based on the previous example, this shows the DoFn emitting to the main and side outputs.</span>
+
+  <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+     <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span>
+       <span class="n">String</span> <span class="n">word</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">();</span>
+       <span class="k">if</span> <span class="o">(</span><span class="n">word</span><span class="o">.</span><span class="na">length</span><span class="o">()</span> <span class="o">&lt;=</span> <span class="n">wordLengthCutOff</span><span class="o">)</span> <span class="o">{</span>
+         <span class="c1">// Emit this short word to the main output.</span>
+         <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">word</span><span class="o">);</span>
+       <span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
+         <span class="c1">// Emit this long word's length to a side output.</span>
+         <span class="n">c</span><span class="o">.</span><span class="na">sideOutput</span><span class="o">(</span><span class="n">wordLengthsAboveCutOffTag</span><span class="o">,</span> <span class="n">word</span><span class="o">.</span><span class="na">length</span><span class="o">());</span>
+       <span class="o">}</span>
+       <span class="k">if</span> <span class="o">(</span><span class="n">word</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="s">"MARKER"</span><span class="o">))</span> <span class="o">{</span>
+         <span class="c1">// Emit this word to a different side output.</span>
+         <span class="n">c</span><span class="o">.</span><span class="na">sideOutput</span><span class="o">(</span><span class="n">markedWordsTag</span><span class="o">,</span> <span class="n">word</span><span class="o">);</span>
+       <span class="o">}</span>
+     <span class="o">}}));</span>
+
+</code></pre>
+</div>
+
+<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="c"># Inside your ParDo's DoFn, you can emit an element to a side output by wrapping the value and the output tag (str).</span>
+<span class="c"># using the pvalue.SideOutputValue wrapper class.</span>
+<span class="c"># Based on the previous example, this shows the DoFn emitting to the main and side outputs.</span>
+
+<span class="k">class</span> <span class="nc">ProcessWords</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
+
+  <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">cutoff_length</span><span class="p">,</span> <span class="n">marker</span><span class="p">):</span>
+    <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">context</span><span class="o">.</span><span class="n">element</span><span class="p">)</span> <span class="o">&lt;=</span> <span class="n">cutoff_length</span><span class="p">:</span>
+      <span class="c"># Emit this short word to the main output.</span>
+      <span class="k">yield</span> <span class="n">context</span><span class="o">.</span><span class="n">element</span>
+    <span class="k">else</span><span class="p">:</span>
+      <span class="c"># Emit this word's long length to a side output.</span>
+      <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">SideOutputValue</span><span class="p">(</span>
+          <span class="s">'above_cutoff_lengths'</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">context</span><span class="o">.</span><span class="n">element</span><span class="p">))</span>
+    <span class="k">if</span> <span class="n">context</span><span class="o">.</span><span class="n">element</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="n">marker</span><span class="p">):</span>
+      <span class="c"># Emit this word to a different side output.</span>
+      <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">SideOutputValue</span><span class="p">(</span><span class="s">'marked strings'</span><span class="p">,</span> <span class="n">context</span><span class="o">.</span><span class="n">element</span><span class="p">)</span>
+
+
+<span class="c"># Side outputs are also available in Map and FlatMap.</span>
+<span class="c"># Here is an example that uses FlatMap and shows that the tags do not need to be specified ahead of time.</span>
+
+<span class="k">def</span> <span class="nf">even_odd</span><span class="p">(</span><span class="n">x</span><span class="p">):</span>
+  <span class="k">yield</span> <span class="n">pvalue</span><span class="o">.</span><span class="n">SideOutputValue</span><span class="p">(</span><span class="s">'odd'</span> <span class="k">if</span> <span class="n">x</span> <span class="o">%</span> <span class="mi">2</span> <span class="k">else</span> <span class="s">'even'</span><span class="p">,</span> <span class="n">x</span><span class="p">)</span>
+  <span class="k">if</span> <span class="n">x</span> <span class="o">%</span> <span class="mi">10</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
+    <span class="k">yield</span> <span class="n">x</span>
+
+<span class="n">results</span> <span class="o">=</span> <span class="n">numbers</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">FlatMap</span><span class="p">(</span><span class="n">even_odd</span><span class="p">)</span><span class="o">.</span><span class="n">with_outputs</span><span class="p">()</span>
+
+<span class="n">evens</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="n">even</span>
+<span class="n">odds</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="n">odd</span>
+<span class="n">tens</span> <span class="o">=</span> <span class="n">results</span><span class="p">[</span><span class="bp">None</span><span class="p">]</span>  <span class="c"># the undeclared main output</span>
+</code></pre>
+</div>
+
 <p><a name="io"></a>
 <a name="running"></a>
 <a name="transforms-composite"></a>
-<a name="transforms-sideio"></a>
 <a name="coders"></a>
 <a name="windowing"></a>
 <a name="triggers"></a></p>