You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/24 18:28:49 UTC
[2/4] beam-site git commit: Replace literal code samples with
extracted, tested snippets.
Replace literal code samples with extracted, tested snippets.
Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/b8673dde
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/b8673dde
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/b8673dde
Branch: refs/heads/asf-site
Commit: b8673ddee75cbdf92067d5787426c0a68a681d02
Parents: 0b44ef7
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jan 20 17:55:13 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jan 20 17:55:13 2017 -0800
----------------------------------------------------------------------
src/documentation/programming-guide.md | 115 ++++++----------------------
1 file changed, 22 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam-site/blob/b8673dde/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index d743c49..869d9db 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -333,9 +333,8 @@ class ComputeWordLengthFn(beam.DoFn):
# Use return to emit the output element.
return [len(word)]
-# Apply a ParDo to the PCollection "words" to compute lengths for each word.
-word_lengths = words | beam.ParDo(ComputeWordLengthFn())
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_apply
+%}```
In the example, our input `PCollection` contains `String` values. We apply a `ParDo` transform that specifies a function (`ComputeWordLengthFn`) to compute the length of each string, and outputs the result to a new `PCollection` of `Integer` values that stores the length of each word.
@@ -420,8 +419,8 @@ words = ...
# Apply a lambda function to the PCollection words.
# Save the result as the PCollection word_lengths.
-word_lengths = words | beam.FlatMap(lambda x: [len(x)])
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_using_flatmap
+%}```
If your `ParDo` performs a one-to-one mapping of input elements to output elements--that is, for each input element, it applies a function that produces *exactly one* output element, you can use the higher-level <span class="language-java">`MapElements`</span><span class="language-py">`Map`</span> transform. <span class="language-java">`MapElements` can accept an anonymous Java 8 lambda function for additional brevity.</span>
@@ -444,8 +443,8 @@ words = ...
# Apply a Map with a lambda function to the PCollection words.
# Save the result as the PCollection word_lengths.
-word_lengths = words | beam.Map(lambda x: len(x))
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_using_map
+%}```
{:.language-java}
> **Note:** You can use Java 8 lambda functions with several other Beam transforms, including `Filter`, `FlatMapElements`, and `Partition`.
@@ -517,10 +516,8 @@ public static class SumInts implements SerializableFunction<Iterable<Integer>, I
```
```py
-# A bounded sum of positive integers.
-def bounded_sum(values, bound=500):
- return min(sum(values), bound)
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:combine_bounded_sum
+%}```
##### **Advanced Combinations using CombineFn**
@@ -574,20 +571,8 @@ public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
```py
pc = ...
-class AverageFn(beam.CombineFn):
- def create_accumulator(self):
- return (0.0, 0)
-
- def add_input(self, (sum, count), input):
- return sum + input, count + 1
-
- def merge_accumulators(self, accumulators):
- sums, counts = zip(*accumulators)
- return sum(sums), sum(counts)
-
- def extract_output(self, (sum, count)):
- return sum / count if count else float('NaN')
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:combine_custom_average
+%}```
If you are combining a `PCollection` of key-value pairs, [per-key combining](#transforms-combine-per-key) is often enough. If you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a `KeyedCombineFn` to access the key within the combining strategy.
@@ -827,40 +812,14 @@ Side inputs are useful if your `ParDo` needs to inject additional data when proc
# For example, using pvalue.AsIter(pcoll) at pipeline construction time results in an iterable of the actual elements of pcoll being passed into each process invocation.
# In this example, side inputs are passed to a FlatMap transform as extra arguments and consumed by filter_using_length.
-# Callable takes additional arguments.
-def filter_using_length(word, lower_bound, upper_bound=float('inf')):
- if lower_bound <= len(word) <= upper_bound:
- yield word
-
-# Construct a deferred side input.
-avg_word_len = (words
- | beam.Map(len)
- | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
-
-# Call with explicit side inputs.
-small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
-
-# A single deferred side input.
-larger_than_average = (words | 'large' >> beam.FlatMap(
- filter_using_length,
- lower_bound=pvalue.AsSingleton(avg_word_len)))
-
-# Mix and match.
-small_but_nontrivial = words | beam.FlatMap(filter_using_length,
- lower_bound=2,
- upper_bound=pvalue.AsSingleton(
- avg_word_len))
-
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_side_input
+%}
# We can also pass side inputs to a ParDo transform, which will get passed to its process method.
# The only change is that the first arguments are self and a context, rather than the PCollection element itself.
-class FilterUsingLength(beam.DoFn):
- def process(self, context, lower_bound, upper_bound=float('inf')):
- if lower_bound <= len(context.element) <= upper_bound:
- yield context.element
-
-small_words = words | beam.ParDo(FilterUsingLength(), 0, 3)
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_side_input_dofn
+%}
...
```
@@ -935,22 +894,13 @@ While `ParDo` always produces a main output `PCollection` (as the return value f
# with_outputs() returns a DoOutputsTuple object. Tags specified in with_outputs are attributes on the returned DoOutputsTuple object.
# The tags give access to the corresponding output PCollections.
-results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
- .with_outputs('above_cutoff_lengths', 'marked strings',
- main='below_cutoff_strings'))
-below = results.below_cutoff_strings
-above = results.above_cutoff_lengths
-marked = results['marked strings'] # indexing works as well
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs
+%}
# The result is also iterable, ordered in the same order that the tags were passed to with_outputs(), the main tag (if specified) first.
-below, above, marked = (words
- | beam.ParDo(
- ProcessWords(), cutoff_length=2, marker='x')
- .with_outputs('above_cutoff_lengths',
- 'marked strings',
- main='below_cutoff_strings'))
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs_iter
+%}```
##### Emitting to Side Outputs in your DoFn:
@@ -983,35 +933,14 @@ below, above, marked = (words
# using the pvalue.SideOutputValue wrapper class.
# Based on the previous example, this shows the DoFn emitting to the main and side outputs.
-class ProcessWords(beam.DoFn):
-
- def process(self, context, cutoff_length, marker):
- if len(context.element) <= cutoff_length:
- # Emit this short word to the main output.
- yield context.element
- else:
- # Emit this word's long length to a side output.
- yield pvalue.SideOutputValue(
- 'above_cutoff_lengths', len(context.element))
- if context.element.startswith(marker):
- # Emit this word to a different side output.
- yield pvalue.SideOutputValue('marked strings', context.element)
-
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_emitting_values_on_side_outputs
+%}
# Side outputs are also available in Map and FlatMap.
# Here is an example that uses FlatMap and shows that the tags do not need to be specified ahead of time.
-def even_odd(x):
- yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
- if x % 10 == 0:
- yield x
-
-results = numbers | beam.FlatMap(even_odd).with_outputs()
-
-evens = results.even
-odds = results.odd
-tens = results[None] # the undeclared main output
-```
+{% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs_undeclared
+%}```
<a name="io"></a>
<a name="running"></a>