You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 17:26:31 UTC
[2/2] incubator-beam git commit: Update Python examples
Update Python examples
* style
* updates to backwards-incompatible changes
* less Dataflow, more Beam
* ...
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f47f71a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f47f71a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f47f71a2
Branch: refs/heads/python-sdk
Commit: f47f71a280366e639e7ea35f59a6ea396dacfb56
Parents: dbba2d3
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Sat Aug 27 23:37:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Sep 2 10:26:21 2016 -0700
----------------------------------------------------------------------
sdks/python/README.md | 232 ++++++++++++++++++++++-----------------------
1 file changed, 116 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f47f71a2/sdks/python/README.md
----------------------------------------------------------------------
diff --git a/sdks/python/README.md b/sdks/python/README.md
index fb89d50..dca9517 100644
--- a/sdks/python/README.md
+++ b/sdks/python/README.md
@@ -16,6 +16,7 @@
specific language governing permissions and limitations
under the License.
-->
+== This page is currently being updated. ==
# Cloud Dataflow SDK for Python
@@ -41,11 +42,11 @@ from the Python programming language.
* [Notes on installing with ``setup.py install``](#notes-on-installing-with-setuppy-install)
* [Local execution of a pipeline](#local-execution-of-a-pipeline)
* [A Quick Tour of the Source Code](#a-quick-tour-of-the-source-code)
- * [Some Simple Examples](#some-simple-examples)
- * [Hello world](#hello-world)
- * [Hello world (with Map)](#hello-world-with-map)
- * [Hello world (with FlatMap)](#hello-world-with-flatmap)
- * [Hello world (with FlatMap and yield)](#hello-world-with-flatmap-and-yield)
+ * [Simple Examples](#simple-examples)
+ * [Basic pipeline](#basic-pipeline)
+ * [Basic pipeline (with Map)](#basic-pipeline-with-map)
+ * [Basic pipeline (with FlatMap)](#basic-pipeline-with-flatmap)
+ * [Basic pipeline (with FlatMap and yield)](#basic-pipeline-with-flatmap-and-yield)
* [Counting words](#counting-words)
* [Counting words with GroupByKey](#counting-words-with-groupbykey)
* [Type hints](#type-hints)
@@ -255,215 +256,214 @@ Some interesting classes to navigate to:
* combiners, in file
[`google/cloud/dataflow/transforms/combiners.py`](http://localhost:8888/google.cloud.dataflow.transforms.combiners.html)
-## Some Simple Examples
+## Simple Examples
-### Hello world
+### Basic pipeline
-Create a transform from an iterable and use the pipe operator to chain
-transforms:
+A basic pipeline will take as input an iterable, apply the
+beam.Create `PTransform`, and produce a `PCollection` that can
+be written to a file or modified by further `PTransform`s. The
+pipe operator allows to chain `PTransform`s.
```python
# Standard imports
-import google.cloud.dataflow as df
+import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
-p = df.Pipeline('DirectPipelineRunner')
+p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
- | df.Create('add names', ['Ann', 'Joe'])
- | df.Write('save', df.io.TextFileSink('./names')))
+ | 'add names' >> beam.Create(['Ann', 'Joe'])
+ | 'save' >> beam.io.Write(beam.io.TextFileSink('./names')))
# Execute the pipeline.
p.run()
```
-### Hello world (with Map)
+### Basic pipeline (with Map)
-The `Map` transform takes a callable, which will be applied to each
+The `Map` `PTransform` takes a callable, which will be applied to each
element of the input `PCollection` and must return an element to go
into the output `PCollection`.
```python
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
-# Read file with names, add a greeting for each, and write results.
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
+# Read a file containing names, add a greeting to each name, and write to a file.
(p
- | df.Read('load messages', df.io.TextFileSource('./names'))
- | df.Map('add greeting',
- lambda name, msg: '%s %s!' % (msg, name),
- 'Hello')
- | df.Write('save', df.io.TextFileSink('./greetings')))
+ | 'load names' >> beam.Read(beam.io.TextFileSource('./names'))
+ | 'add greeting' >> beam.Map(lambda name, msg: '%s, %s!' % (msg, name), 'Hello')
+ | 'save' >> beam.Write(beam.io.TextFileSink('./greetings')))
p.run()
```
-### Hello world (with FlatMap)
+### Basic pipeline (with FlatMap)
A `FlatMap` is like a `Map` except its callable returns a (possibly
empty) iterable of elements for the output `PCollection`.
```python
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
-# Read previous file, add a name to each greeting and write results.
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
+# Read a file containing names, add two greetings to each name, and write to a file.
(p
- | df.Read('load messages', df.io.TextFileSource('./names'))
- | df.FlatMap('add greetings',
- lambda name, msgs: ['%s %s!' % (m, name) for m in msgs],
- ['Hello', 'Hola'])
- | df.Write('save', df.io.TextFileSink('./greetings')))
+ | 'load names' >> beam.Read(beam.io.TextFileSource('./names'))
+ | 'add greetings' >> beam.FlatMap(
+ lambda name, messages: ['%s %s!' % (msg, name) for msg in messages],
+ ['Hello', 'Hola'])
+ | 'save' >> beam.Write(beam.io.TextFileSink('./greetings')))
p.run()
```
-### Hello world (with FlatMap and yield)
+### Basic pipeline (with FlatMap and yield)
The callable of a `FlatMap` can be a generator, that is,
a function using `yield`.
```python
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
-# Add greetings using a FlatMap function using yield.
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
+# Read a file containing names, add two greetings to each name
+# (with FlatMap using a yield generator), and write to a file.
def add_greetings(name, messages):
- for m in messages:
- yield '%s %s!' % (m, name)
+ for msg in messages:
+ yield '%s %s!' % (msg, name)
(p
- | df.Read('load names', df.io.TextFileSource('./names'))
- | df.FlatMap('greet', add_greetings, ['Hello', 'Hola'])
- | df.Write('save', df.io.TextFileSink('./greetings')))
+ | 'load names' >> beam.Read(beam.io.TextFileSource('./names'))
+ | 'add greetings' >> beam.FlatMap(add_greetings, ['Hello', 'Hola'])
+ | 'save' >> beam.Write(beam.io.TextFileSink('./greetings')))
p.run()
```
### Counting words
-This example counts the words in a text and also shows how to read a
-text file from [Google Cloud Storage](https://cloud.google.com/storage/).
+This example shows how to read a text file from
+[Google Cloud Storage](https://cloud.google.com/storage/)
+and count its words.
```python
import re
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
(p
- | df.Read('read',
- df.io.TextFileSource(
- 'gs://dataflow-samples/shakespeare/kinglear.txt'))
- | df.FlatMap('split', lambda x: re.findall(r'\w+', x))
- | df.combiners.Count.PerElement('count words')
- | df.Write('write', df.io.TextFileSink('./results')))
+ | 'read' >> beam.Read(
+ beam.io.TextFileSource('gs://dataflow-samples/shakespeare/kinglear.txt'))
+ | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x))
+ | 'count words' >> beam.combiners.Count.PerElement()
+ | 'save' >> beam.Write(beam.io.TextFileSink('./word_count')))
p.run()
```
### Counting words with GroupByKey
-Here we use `GroupByKey` to count the words.
-This is a somewhat forced example of `GroupByKey`; normally one would use
-the transform `df.combiners.Count.PerElement`, as in the previous example.
-The example also shows the use of a wild-card in specifying the text file
-source.
-
+This is a somewhat forced example of `GroupByKey` to achieve the same
+functionality of the previous example without using
+`beam.combiners.Count.PerElement`. It demonstrates also the use of a
+wildcard to specify the text file source.
```python
import re
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
-class MyCountTransform(df.PTransform):
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
+class MyCountTransform(beam.PTransform):
def apply(self, pcoll):
return (pcoll
- | df.Map('one word', lambda w: (w, 1))
- # GroupByKey accepts a PCollection of (w, 1) and
- # outputs a PCollection of (w, (1, 1, ...))
- | df.GroupByKey('group words')
- | df.Map('count words', lambda (word, counts): (word, len(counts))))
+ | 'one word' >> beam.Map(lambda word: (word, 1))
+ # GroupByKey accepts a PCollection of (word, 1) elements and
+ # outputs a PCollection of (word, [1, 1, ...])
+ | 'group words' >> beam.GroupByKey()
+ | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))))
(p
- | df.Read('read', df.io.TextFileSource('./names*'))
- | df.FlatMap('split', lambda x: re.findall(r'\w+', x))
+ | 'read' >> beam.Read(beam.io.TextFileSource('./names*'))
+ | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x))
| MyCountTransform()
- | df.Write('write', df.io.TextFileSink('./results')))
+ | 'write' >> beam.Write(beam.io.TextFileSink('./word_count')))
p.run()
```
### Type hints
-In some cases, you can improve the efficiency of the data encoding by providing
-type hints. For example:
+In some cases, providing type hints can improve the efficiency
+of the data encoding.
```python
-import google.cloud.dataflow as df
-from google.cloud.dataflow.typehints import typehints
-p = df.Pipeline('DirectPipelineRunner')
+import apache_beam as beam
+from apache_beam.typehints import typehints
+p = beam.Pipeline('DirectPipelineRunner')
(p
- | df.Read('A', df.io.TextFileSource('./names'))
- | df.Map('B1', lambda x: (x, 1)).with_output_types(typehints.KV[str, int])
- | df.GroupByKey('GBK')
- | df.Write('C', df.io.TextFileSink('./results')))
+ | 'read' >> beam.Read(beam.io.TextFileSource('./names'))
+ | 'add types' >> beam.Map(lambda x: (x, 1)).with_output_types(typehints.KV[str, int])
+ | 'group words' >> beam.GroupByKey()
+ | 'save' >> beam.Write(beam.io.TextFileSink('./typed_names')))
p.run()
```
### BigQuery
-Here is a pipeline that reads input from a BigQuery table and writes the result
-to a different table. This example calculates the number of tornadoes per month
-from weather data. To run it you will need to provide an output table that
-you can write to.
+This example calculates the number of tornadoes per month (from weather data).
+The input is read from a BigQuery table and the output is written to a
+different table specified by the user, along with a target project.
```python
-import google.cloud.dataflow as df
+import apache_beam as beam
+project = 'DESTINATION-PROJECT-ID'
input_table = 'clouddataflow-readonly:samples.weather_stations'
-project = 'YOUR-PROJECT'
-output_table = 'DATASET.TABLENAME'
-p = df.Pipeline(argv=['--project', project])
+output_table = 'DESTINATION-DATASET.DESTINATION-TABLE'
+
+p = beam.Pipeline(argv=['--project', project])
(p
- | df.Read('read', df.io.BigQuerySource(input_table))
- | df.FlatMap(
- 'months with tornadoes',
- lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
- | df.CombinePerKey('monthly count', sum)
- | df.Map('format', lambda (k, v): {'month': k, 'tornado_count': v})
- | df.Write('write', df.io.BigQuerySink(
- output_table,
- schema='month:INTEGER, tornado_count:INTEGER',
- create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE)))
+ | 'read' >> beam.Read(beam.io.BigQuerySource(input_table))
+ | 'months with tornadoes' >> beam.FlatMap(
+ lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
+ | 'monthly count' >> beam.CombinePerKey(sum)
+ | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v})
+ | 'save' >> beam.Write(
+ beam.io.BigQuerySink(
+ output_table,
+ schema='month:INTEGER, tornado_count:INTEGER',
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
```
-Here is a pipeline that achieves the same functionality, i.e., calculates the
-number of tornadoes per month, but uses a query to filter out input instead
-of using the whole table.
+This pipeline calculates the number of tornadoes per month, but it uses
+a query to filter out the input instead of using the whole table.
```python
-import google.cloud.dataflow as df
-project = 'YOUR-PROJECT'
-output_table = 'DATASET.TABLENAME'
+import apache_beam as beam
+project = 'DESTINATION-PROJECT-ID'
+output_table = 'DESTINATION-DATASET.DESTINATION-TABLE'
input_query = 'SELECT month, COUNT(month) AS tornado_count ' \
'FROM [clouddataflow-readonly:samples.weather_stations] ' \
'WHERE tornado=true GROUP BY month'
-p = df.Pipeline(argv=['--project', project])
+p = beam.Pipeline(argv=['--project', project])
(p
-| df.Read('read', df.io.BigQuerySource(query=input_query))
-| df.Write('write', df.io.BigQuerySink(
+ | 'read' >> beam.Read(beam.io.BigQuerySource(query=input_query))
+ | 'save' >> beam.Write(beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
- create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE)))
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
```
### Combiner Examples
-A common case for Dataflow combiners is to sum (or max or min) over the values
-of each key. Such standard Python functions can be used directly as combiner
-functions. In fact, any function "reducing" an iterable to a single value can be
-used.
+Combiners are used to create a `PCollection` that contains the sums
+(or max or min) of each of the keys in the initial `PCollecion`.
+Such standard Python functions can be used directly as combiner
+functions. In fact, any function "reducing" an iterable to a
+single value can be used.
```python
-import google.cloud.dataflow as df
-p = df.Pipeline('DirectPipelineRunner')
+import apache_beam as beam
+p = beam.Pipeline('DirectPipelineRunner')
SAMPLE_DATA = [('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20)]
(p
- | df.Create(SAMPLE_DATA)
- | df.CombinePerKey(sum)
- | df.Write(df.io.TextFileSink('./results')))
+ | beam.Create(SAMPLE_DATA)
+ | beam.CombinePerKey(sum)
+ | beam.Write(beam.io.TextFileSink('./sums')))
p.run()
```