You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 17:26:31 UTC

[2/2] incubator-beam git commit: Update Python examples

Update Python examples

* style
* updates to backwards-incompatible changes
* less Dataflow, more Beam
* ...


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f47f71a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f47f71a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f47f71a2

Branch: refs/heads/python-sdk
Commit: f47f71a280366e639e7ea35f59a6ea396dacfb56
Parents: dbba2d3
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Sat Aug 27 23:37:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 10:26:21 2016 -0700

----------------------------------------------------------------------
 sdks/python/README.md | 232 ++++++++++++++++++++++-----------------------
 1 file changed, 116 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f47f71a2/sdks/python/README.md
----------------------------------------------------------------------
diff --git a/sdks/python/README.md b/sdks/python/README.md
index fb89d50..dca9517 100644
--- a/sdks/python/README.md
+++ b/sdks/python/README.md
@@ -16,6 +16,7 @@
     specific language governing permissions and limitations
     under the License.
 -->
+== This page is currently being updated. ==
 
 # Cloud Dataflow SDK for Python
 
@@ -41,11 +42,11 @@ from the Python programming language.
           * [Notes on installing with ``setup.py install``](#notes-on-installing-with-setuppy-install)
   * [Local execution of a pipeline](#local-execution-of-a-pipeline)
   * [A Quick Tour of the Source Code](#a-quick-tour-of-the-source-code)
-  * [Some Simple Examples](#some-simple-examples)
-      * [Hello world](#hello-world)
-      * [Hello world (with Map)](#hello-world-with-map)
-      * [Hello world (with FlatMap)](#hello-world-with-flatmap)
-      * [Hello world (with FlatMap and yield)](#hello-world-with-flatmap-and-yield)
+  * [Simple Examples](#simple-examples)
+      * [Basic pipeline](#basic-pipeline)
+      * [Basic pipeline (with Map)](#basic-pipeline-with-map)
+      * [Basic pipeline (with FlatMap)](#basic-pipeline-with-flatmap)
+      * [Basic pipeline (with FlatMap and yield)](#basic-pipeline-with-flatmap-and-yield)
       * [Counting words](#counting-words)
       * [Counting words with GroupByKey](#counting-words-with-groupbykey)
       * [Type hints](#type-hints)
@@ -255,215 +256,214 @@ Some interesting classes to navigate to:
 * combiners, in file
 [`google/cloud/dataflow/transforms/combiners.py`](http://localhost:8888/google.cloud.dataflow.transforms.combiners.html)
 
-## Some Simple Examples
+## Simple Examples
 
-### Hello world
+### Basic pipeline
 
-Create a transform from an iterable and use the pipe operator to chain
-transforms:
+A basic pipeline will take as input an iterable, apply the
+beam.Create `PTransform`, and produce a `PCollection` that can
+be written to a file or modified by further `PTransform`s. The
+pipe operator allows to chain `PTransform`s.
 
 ```python
 # Standard imports
-import google.cloud.dataflow as df
+import apache_beam as beam
 # Create a pipeline executing on a direct runner (local, non-cloud).
-p = df.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectPipelineRunner')
 # Create a PCollection with names and write it to a file.
 (p
- | df.Create('add names', ['Ann', 'Joe'])
- | df.Write('save', df.io.TextFileSink('./names')))
+ | 'add names' >> beam.Create(['Ann', 'Joe'])
+ | 'save' >> beam.io.Write(beam.io.TextFileSink('./names')))
 # Execute the pipeline.
 p.run()
 ```
 
-### Hello world (with Map)
+### Basic pipeline (with Map)
 
-The `Map` transform takes a callable, which will be applied to each
+The `Map` `PTransform` takes a callable, which will be applied to each
 element of the input `PCollection` and must return an element to go
 into the output `PCollection`.
 
 ```python
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
-# Read file with names, add a greeting for each, and write results.
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
+# Read a file containing names, add a greeting to each name, and write to a file.
 (p
- | df.Read('load messages', df.io.TextFileSource('./names'))
- | df.Map('add greeting',
-          lambda name, msg: '%s %s!' % (msg, name),
-          'Hello')
- | df.Write('save', df.io.TextFileSink('./greetings')))
+ | 'load names' >> beam.Read(beam.io.TextFileSource('./names'))
+ | 'add greeting' >> beam.Map(lambda name, msg: '%s, %s!' % (msg, name), 'Hello')
+ | 'save' >> beam.Write(beam.io.TextFileSink('./greetings')))
 p.run()
 ```
 
-### Hello world (with FlatMap)
+### Basic pipeline (with FlatMap)
 
 A `FlatMap` is like a `Map` except its callable returns a (possibly
 empty) iterable of elements for the output `PCollection`.
 
 ```python
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
-# Read previous file, add a name to each greeting and write results.
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
+# Read a file containing names, add two greetings to each name, and write to a file.
 (p
- | df.Read('load messages', df.io.TextFileSource('./names'))
- | df.FlatMap('add greetings',
-              lambda name, msgs: ['%s %s!' % (m, name) for m in msgs],
-              ['Hello', 'Hola'])
- | df.Write('save', df.io.TextFileSink('./greetings')))
+ | 'load names' >> beam.Read(beam.io.TextFileSource('./names'))
+ | 'add greetings' >> beam.FlatMap(
+    lambda name, messages: ['%s %s!' % (msg, name) for msg in messages],
+    ['Hello', 'Hola'])
+ | 'save' >> beam.Write(beam.io.TextFileSink('./greetings')))
 p.run()
 ```
 
-### Hello world (with FlatMap and yield)
+### Basic pipeline (with FlatMap and yield)
 
 The callable of a `FlatMap` can be a generator, that is,
 a function using `yield`.
 
 ```python
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
-# Add greetings using a FlatMap function using yield.
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
+# Read a file containing names, add two greetings to each name
+# (with FlatMap using a yield generator), and write to a file.
 def add_greetings(name, messages):
-  for m in messages:
-    yield '%s %s!' % (m, name)
+  for msg in messages:
+    yield '%s %s!' % (msg, name)
 
 (p
- | df.Read('load names', df.io.TextFileSource('./names'))
- | df.FlatMap('greet', add_greetings, ['Hello', 'Hola'])
- | df.Write('save', df.io.TextFileSink('./greetings')))
+ | 'load names' >> beam.Read(beam.io.TextFileSource('./names'))
+ | 'add greetings' >> beam.FlatMap(add_greetings, ['Hello', 'Hola'])
+ | 'save' >> beam.Write(beam.io.TextFileSink('./greetings')))
 p.run()
 ```
 
 ### Counting words
 
-This example counts the words in a text and also shows how to read a
-text file from [Google Cloud Storage](https://cloud.google.com/storage/).
+This example shows how to read a text file from
+[Google Cloud Storage](https://cloud.google.com/storage/)
+and count its words.
 
 ```python
 import re
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
 (p
- | df.Read('read',
-           df.io.TextFileSource(
-           'gs://dataflow-samples/shakespeare/kinglear.txt'))
- | df.FlatMap('split', lambda x: re.findall(r'\w+', x))
- | df.combiners.Count.PerElement('count words')
- | df.Write('write', df.io.TextFileSink('./results')))
+ | 'read' >> beam.Read(
+    beam.io.TextFileSource('gs://dataflow-samples/shakespeare/kinglear.txt'))
+ | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x))
+ | 'count words' >> beam.combiners.Count.PerElement()
+ | 'save' >> beam.Write(beam.io.TextFileSink('./word_count')))
 p.run()
 ```
 
 ### Counting words with GroupByKey
 
-Here we use `GroupByKey` to count the words.
-This is a somewhat forced example of `GroupByKey`; normally one would use
-the transform `df.combiners.Count.PerElement`, as in the previous example.
-The example also shows the use of a wild-card in specifying the text file
-source.
-
+This is a somewhat forced example of `GroupByKey` to achieve the same
+functionality of the previous example without using
+`beam.combiners.Count.PerElement`. It demonstrates also the use of a
+wildcard to specify the text file source.
 ```python
 import re
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
-class MyCountTransform(df.PTransform):
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
+class MyCountTransform(beam.PTransform):
   def apply(self, pcoll):
     return (pcoll
-    | df.Map('one word', lambda w: (w, 1))
-    # GroupByKey accepts a PCollection of (w, 1) and
-    # outputs a PCollection of (w, (1, 1, ...))
-    | df.GroupByKey('group words')
-    | df.Map('count words', lambda (word, counts): (word, len(counts))))
+            | 'one word' >> beam.Map(lambda word: (word, 1))
+            # GroupByKey accepts a PCollection of (word, 1) elements and
+            # outputs a PCollection of (word, [1, 1, ...])
+            | 'group words' >> beam.GroupByKey()
+            | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))))
 
 (p
- | df.Read('read', df.io.TextFileSource('./names*'))
- | df.FlatMap('split', lambda x: re.findall(r'\w+', x))
+ | 'read' >> beam.Read(beam.io.TextFileSource('./names*'))
+ | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x))
  | MyCountTransform()
- | df.Write('write', df.io.TextFileSink('./results')))
+ | 'write' >> beam.Write(beam.io.TextFileSink('./word_count')))
 p.run()
 ```
 
 ### Type hints
 
-In some cases, you can improve the efficiency of the data encoding by providing
-type hints.  For example:
+In some cases, providing type hints can improve the efficiency
+of the data encoding.
 
 ```python
-import google.cloud.dataflow as df
-from google.cloud.dataflow.typehints import typehints
-p = df.Pipeline('DirectPipelineRunner')
+import apache_beam as beam
+from apache_beam.typehints import typehints
+p = beam.Pipeline('DirectPipelineRunner')
 (p
- | df.Read('A', df.io.TextFileSource('./names'))
- | df.Map('B1', lambda x: (x, 1)).with_output_types(typehints.KV[str, int])
- | df.GroupByKey('GBK')
- | df.Write('C', df.io.TextFileSink('./results')))
+ | 'read' >> beam.Read(beam.io.TextFileSource('./names'))
+ | 'add types' >> beam.Map(lambda x: (x, 1)).with_output_types(typehints.KV[str, int])
+ | 'group words' >> beam.GroupByKey()
+ | 'save' >> beam.Write(beam.io.TextFileSink('./typed_names')))
 p.run()
 ```
 
 ### BigQuery
 
-Here is a pipeline that reads input from a BigQuery table and writes the result
-to a different table. This example calculates the number of tornadoes per month
-from weather data. To run it you will need to provide an output table that
-you can write to.
+This example calculates the number of tornadoes per month (from weather data).
+The input is read from a BigQuery table and the output is written to a
+different table specified by the user, along with a target project.
 
 ```python
-import google.cloud.dataflow as df
+import apache_beam as beam
+project = 'DESTINATION-PROJECT-ID'
 input_table = 'clouddataflow-readonly:samples.weather_stations'
-project = 'YOUR-PROJECT'
-output_table = 'DATASET.TABLENAME'
-p = df.Pipeline(argv=['--project', project])
+output_table = 'DESTINATION-DATASET.DESTINATION-TABLE'
+
+p = beam.Pipeline(argv=['--project', project])
 (p
- | df.Read('read', df.io.BigQuerySource(input_table))
- | df.FlatMap(
-     'months with tornadoes',
-     lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
- | df.CombinePerKey('monthly count', sum)
- | df.Map('format', lambda (k, v): {'month': k, 'tornado_count': v})
- | df.Write('write', df.io.BigQuerySink(
-      output_table,
-      schema='month:INTEGER, tornado_count:INTEGER',
-      create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
-      write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE)))
+ | 'read' >> beam.Read(beam.io.BigQuerySource(input_table))
+ | 'months with tornadoes' >> beam.FlatMap(
+    lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
+ | 'monthly count' >> beam.CombinePerKey(sum)
+ | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v})
+ | 'save' >> beam.Write(
+    beam.io.BigQuerySink(
+        output_table,
+        schema='month:INTEGER, tornado_count:INTEGER',
+        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
 p.run()
 ```
 
-Here is a pipeline that achieves the same functionality, i.e., calculates the
-number of tornadoes per month, but uses a query to filter out input instead
-of using the whole table.
+This pipeline calculates the number of tornadoes per month, but it uses
+a query to filter out the input instead of using the whole table.
 
 ```python
-import google.cloud.dataflow as df
-project = 'YOUR-PROJECT'
-output_table = 'DATASET.TABLENAME'
+import apache_beam as beam
+project = 'DESTINATION-PROJECT-ID'
+output_table = 'DESTINATION-DATASET.DESTINATION-TABLE'
 input_query = 'SELECT month, COUNT(month) AS tornado_count ' \
         'FROM [clouddataflow-readonly:samples.weather_stations] ' \
         'WHERE tornado=true GROUP BY month'
-p = df.Pipeline(argv=['--project', project])
+p = beam.Pipeline(argv=['--project', project])
 (p
-| df.Read('read', df.io.BigQuerySource(query=input_query))
-| df.Write('write', df.io.BigQuerySink(
+ | 'read' >> beam.Read(beam.io.BigQuerySource(query=input_query))
+ | 'save' >> beam.Write(beam.io.BigQuerySink(
     output_table,
     schema='month:INTEGER, tornado_count:INTEGER',
-    create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
-    write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE)))
+    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
 p.run()
 ```
 
 ### Combiner Examples
 
-A common case for Dataflow combiners is to sum (or max or min) over the values
-of each key. Such standard Python functions can be used directly as combiner
-functions. In fact, any function "reducing" an iterable to a single value can be
-used.
+Combiners are used to create a `PCollection` that contains the sums
+(or max or min) of each of the keys in the initial `PCollecion`.
+Such standard Python functions can be used directly as combiner
+functions. In fact, any function "reducing" an iterable to a
+single value can be used.
 
 ```python
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
 
 SAMPLE_DATA = [('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20)]
 
 (p
- | df.Create(SAMPLE_DATA)
- | df.CombinePerKey(sum)
- | df.Write(df.io.TextFileSink('./results')))
+ | beam.Create(SAMPLE_DATA)
+ | beam.CombinePerKey(sum)
+ | beam.Write(beam.io.TextFileSink('./sums')))
 p.run()
 ```