You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/24 18:28:49 UTC

[2/4] beam-site git commit: Replace literal code samples with extracted, tested snippets.

Replace literal code samples with extracted, tested snippets.


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/b8673dde
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/b8673dde
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/b8673dde

Branch: refs/heads/asf-site
Commit: b8673ddee75cbdf92067d5787426c0a68a681d02
Parents: 0b44ef7
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jan 20 17:55:13 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 17:55:13 2017 -0800

----------------------------------------------------------------------
 src/documentation/programming-guide.md | 115 ++++++----------------------
 1 file changed, 22 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/b8673dde/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index d743c49..869d9db 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -333,9 +333,8 @@ class ComputeWordLengthFn(beam.DoFn):
     # Use return to emit the output element.
     return [len(word)]
 
-# Apply a ParDo to the PCollection "words" to compute lengths for each word.
-word_lengths = words | beam.ParDo(ComputeWordLengthFn())
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_apply
+%}```
 
 In the example, our input `PCollection` contains `String` values. We apply a `ParDo` transform that specifies a function (`ComputeWordLengthFn`) to compute the length of each string, and outputs the result to a new `PCollection` of `Integer` values that stores the length of each word.
 
@@ -420,8 +419,8 @@ words = ...
 
 # Apply a lambda function to the PCollection words.
 # Save the result as the PCollection word_lengths.
-word_lengths = words | beam.FlatMap(lambda x: [len(x)])
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_using_flatmap
+%}```
 
 If your `ParDo` performs a one-to-one mapping of input elements to output elements--that is, for each input element, it applies a function that produces *exactly one* output element, you can use the higher-level <span class="language-java">`MapElements`</span><span class="language-py">`Map`</span> transform. <span class="language-java">`MapElements` can accept an anonymous Java 8 lambda function for additional brevity.</span>
 
@@ -444,8 +443,8 @@ words = ...
 
 # Apply a Map with a lambda function to the PCollection words.
 # Save the result as the PCollection word_lengths.
-word_lengths = words | beam.Map(lambda x: len(x))
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_using_map
+%}```
 
 {:.language-java}
 > **Note:** You can use Java 8 lambda functions with several other Beam transforms, including `Filter`, `FlatMapElements`, and `Partition`.
@@ -517,10 +516,8 @@ public static class SumInts implements SerializableFunction<Iterable<Integer>, I
 ```
 
 ```py
-# A bounded sum of positive integers.
-def bounded_sum(values, bound=500):
-  return min(sum(values), bound)
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:combine_bounded_sum
+%}```
 
 ##### **Advanced Combinations using CombineFn**
 
@@ -574,20 +571,8 @@ public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
 
 ```py
 pc = ...
-class AverageFn(beam.CombineFn):
-  def create_accumulator(self):
-    return (0.0, 0)
-
-  def add_input(self, (sum, count), input):
-    return sum + input, count + 1
-
-  def merge_accumulators(self, accumulators):
-    sums, counts = zip(*accumulators)
-    return sum(sums), sum(counts)
-
-  def extract_output(self, (sum, count)):
-    return sum / count if count else float('NaN')
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:combine_custom_average
+%}```
 
 If you are combining a `PCollection` of key-value pairs, [per-key combining](#transforms-combine-per-key) is often enough. If you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a `KeyedCombineFn` to access the key within the combining strategy.
 
@@ -827,40 +812,14 @@ Side inputs are useful if your `ParDo` needs to inject additional data when proc
 # For example, using pvalue.AsIter(pcoll) at pipeline construction time results in an iterable of the actual elements of pcoll being passed into each process invocation.
 # In this example, side inputs are passed to a FlatMap transform as extra arguments and consumed by filter_using_length.
 
-# Callable takes additional arguments.
-def filter_using_length(word, lower_bound, upper_bound=float('inf')):
-  if lower_bound <= len(word) <= upper_bound:
-    yield word
-
-# Construct a deferred side input.
-avg_word_len = (words
-                | beam.Map(len)
-                | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
-
-# Call with explicit side inputs.
-small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
-
-# A single deferred side input.
-larger_than_average = (words | 'large' >> beam.FlatMap(
-    filter_using_length,
-    lower_bound=pvalue.AsSingleton(avg_word_len)))
-
-# Mix and match.
-small_but_nontrivial = words | beam.FlatMap(filter_using_length,
-                                            lower_bound=2,
-                                            upper_bound=pvalue.AsSingleton(
-                                                avg_word_len))
-
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_side_input
+%}
 
 # We can also pass side inputs to a ParDo transform, which will get passed to its process method.
 # The only change is that the first arguments are self and a context, rather than the PCollection element itself.
 
-class FilterUsingLength(beam.DoFn):
-  def process(self, context, lower_bound, upper_bound=float('inf')):
-    if lower_bound <= len(context.element) <= upper_bound:
-      yield context.element
-
-small_words = words | beam.ParDo(FilterUsingLength(), 0, 3)
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_side_input_dofn
+%}
 ...
 
 ```
@@ -935,22 +894,13 @@ While `ParDo` always produces a main output `PCollection` (as the return value f
 # with_outputs() returns a DoOutputsTuple object. Tags specified in with_outputs are attributes on the returned DoOutputsTuple object.
 # The tags give access to the corresponding output PCollections.
 
-results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
-           .with_outputs('above_cutoff_lengths', 'marked strings',
-                         main='below_cutoff_strings'))
-below = results.below_cutoff_strings
-above = results.above_cutoff_lengths
-marked = results['marked strings']  # indexing works as well
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs
+%}
 
 # The result is also iterable, ordered in the same order that the tags were passed to with_outputs(), the main tag (if specified) first.
 
-below, above, marked = (words
-                        | beam.ParDo(
-                            ProcessWords(), cutoff_length=2, marker='x')
-                        .with_outputs('above_cutoff_lengths',
-                                      'marked strings',
-                                      main='below_cutoff_strings'))
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs_iter
+%}```
 
 ##### Emitting to Side Outputs in your DoFn:
 
@@ -983,35 +933,14 @@ below, above, marked = (words
 # using the pvalue.SideOutputValue wrapper class.
 # Based on the previous example, this shows the DoFn emitting to the main and side outputs.
 
-class ProcessWords(beam.DoFn):
-
-  def process(self, context, cutoff_length, marker):
-    if len(context.element) <= cutoff_length:
-      # Emit this short word to the main output.
-      yield context.element
-    else:
-      # Emit this word's long length to a side output.
-      yield pvalue.SideOutputValue(
-          'above_cutoff_lengths', len(context.element))
-    if context.element.startswith(marker):
-      # Emit this word to a different side output.
-      yield pvalue.SideOutputValue('marked strings', context.element)
-
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_emitting_values_on_side_outputs
+%}
 
 # Side outputs are also available in Map and FlatMap.
 # Here is an example that uses FlatMap and shows that the tags do not need to be specified ahead of time.
 
-def even_odd(x):
-  yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
-  if x % 10 == 0:
-    yield x
-
-results = numbers | beam.FlatMap(even_odd).with_outputs()
-
-evens = results.even
-odds = results.odd
-tens = results[None]  # the undeclared main output
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs_undeclared
+%}```
 
 <a name="io"></a>
 <a name="running"></a>