You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:21 UTC

[46/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
new file mode 100644
index 0000000..f6bb63a
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -0,0 +1,872 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Code snippets used in Cloud Dataflow webdocs.
+
+The examples here are written specifically to read well with the accompanying
+web docs from https://cloud.google.com/dataflow. Do not rewrite them until you
+make sure the webdocs still read well and the rewritten code supports the
+concept being described. For example, there are snippets that could be shorter
+but they are written like this to make a specific point in the docs.
+
+The code snippets are all organized as self contained functions. Parts of the
+function body delimited by [START tag] and [END tag] will be included
+automatically in the web docs. The naming convention for the tags is to have as
+prefix the PATH_TO_HTML where they are included followed by a descriptive
+string. For instance a code snippet that will be used as a code example
+at https://cloud.google.com/dataflow/model/pipelines will have the tag
+model_pipelines_DESCRIPTION. The tags can contain only letters, digits and _.
+"""
+
+import google.cloud.dataflow as df
+
+# Quiet some pylint warnings that happen because of the somewhat special
+# format for the code snippets.
+# pylint:disable=invalid-name
+# pylint:disable=expression-not-assigned
+# pylint:disable=redefined-outer-name
+# pylint:disable=unused-variable
+# pylint:disable=g-doc-args
+# pylint:disable=g-import-not-at-top
+
+
+class SnippetUtils(object):
+  from google.cloud.dataflow.pipeline import PipelineVisitor
+
+  class RenameFiles(PipelineVisitor):
+    """RenameFiles will rewire source and sink for unit testing.
+
+    RenameFiles will rewire the GCS files specified in the source and
+    sink in the snippet pipeline to local files so the pipeline can be run as a
+    unit test. This is as close as we can get to have code snippets that are
+    executed and are also ready to presented in webdocs.
+    """
+
+    def __init__(self, renames):
+      self.renames = renames
+
+    def visit_transform(self, transform_node):
+      if hasattr(transform_node.transform, 'source'):
+        source = transform_node.transform.source
+        source.file_path = self.renames['read']
+        source.is_gcs_source = False
+      elif hasattr(transform_node.transform, 'sink'):
+        sink = transform_node.transform.sink
+        sink.file_path = self.renames['write']
+        sink.is_gcs_sink = False
+
+
+def construct_pipeline(renames):
+  """A reverse words snippet as an example for constructing a pipeline.
+
+  URL: https://cloud.google.com/dataflow/pipelines/constructing-your-pipeline
+  """
+  import re
+
+  class ReverseWords(df.PTransform):
+    """A PTransform that reverses individual elements in a PCollection."""
+
+    def apply(self, pcoll):
+      return pcoll | df.Map(lambda e: e[::-1])
+
+  def filter_words(unused_x):
+    """Pass through filter to select everything."""
+    return True
+
+  # [START pipelines_constructing_creating]
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  p = df.Pipeline(options=PipelineOptions())
+  # [END pipelines_constructing_creating]
+
+  # [START pipelines_constructing_reading]
+  lines = p | df.io.Read('ReadMyFile',
+                      df.io.TextFileSource('gs://some/inputData.txt'))
+  # [END pipelines_constructing_reading]
+
+  # [START pipelines_constructing_applying]
+  words = lines | df.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+  reversed_words = words | ReverseWords()
+  # [END pipelines_constructing_applying]
+
+  # [START pipelines_constructing_writing]
+  filtered_words = reversed_words | df.Filter('FilterWords', filter_words)
+  filtered_words | df.io.Write('WriteMyFile',
+                               df.io.TextFileSink('gs://some/outputData.txt'))
+  # [END pipelines_constructing_writing]
+
+  p.visit(SnippetUtils.RenameFiles(renames))
+
+  # [START pipelines_constructing_running]
+  p.run()
+  # [END pipelines_constructing_running]
+
+
+def model_pipelines(argv):
+  """A wordcount snippet as a simple pipeline example.
+
+  URL: https://cloud.google.com/dataflow/model/pipelines
+  """
+  # [START model_pipelines]
+  import re
+
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  class MyOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--input',
+                          dest='input',
+                          default='gs://dataflow-samples/shakespeare/kinglear'
+                          '.txt',
+                          help='Input file to process.')
+      parser.add_argument('--output',
+                          dest='output',
+                          required=True,
+                          help='Output file to write results to.')
+
+  pipeline_options = PipelineOptions(argv)
+  my_options = pipeline_options.view_as(MyOptions)
+
+  p = df.Pipeline(options=pipeline_options)
+
+  (p
+   | df.io.Read(df.io.TextFileSource(my_options.input))
+   | df.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+   | df.Map(lambda x: (x, 1)) | df.combiners.Count.PerKey()
+   | df.io.Write(df.io.TextFileSink(my_options.output)))
+
+  p.run()
+  # [END model_pipelines]
+
+
+def model_pcollection(argv):
+  """Creating a PCollection from data in local memory.
+
+  URL: https://cloud.google.com/dataflow/model/pcollection
+  """
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  class MyOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--output',
+                          dest='output',
+                          required=True,
+                          help='Output file to write results to.')
+
+  pipeline_options = PipelineOptions(argv)
+  my_options = pipeline_options.view_as(MyOptions)
+
+  # [START model_pcollection]
+  p = df.Pipeline(options=pipeline_options)
+
+  (p
+   | df.Create([
+       'To be, or not to be: that is the question: ',
+       'Whether \'tis nobler in the mind to suffer ',
+       'The slings and arrows of outrageous fortune, ',
+       'Or to take arms against a sea of troubles, '])
+   | df.io.Write(df.io.TextFileSink(my_options.output)))
+
+  p.run()
+  # [END model_pcollection]
+
+
+def pipeline_options_remote(argv):
+  """"Creating a Pipeline using a PipelineOptions object for remote execution.
+
+  URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+  """
+
+  from google.cloud.dataflow import Pipeline
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  # [START pipeline_options_create]
+  options = PipelineOptions(flags=argv)
+  # [END pipeline_options_create]
+
+  # [START pipeline_options_define_custom]
+  class MyOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--input')
+      parser.add_argument('--output')
+  # [END pipeline_options_define_custom]
+
+  from google.cloud.dataflow.utils.options import GoogleCloudOptions
+  from google.cloud.dataflow.utils.options import StandardOptions
+
+  # [START pipeline_options_dataflow_service]
+  # Create and set your PipelineOptions.
+  options = PipelineOptions(flags=argv)
+
+  # For Cloud execution, set the Cloud Platform project, job_name,
+  # staging location, temp_location and specify DataflowPipelineRunner or
+  # BlockingDataflowPipelineRunner.
+  google_cloud_options = options.view_as(GoogleCloudOptions)
+  google_cloud_options.project = 'my-project-id'
+  google_cloud_options.job_name = 'myjob'
+  google_cloud_options.staging_location = 'gs://my-bucket/binaries'
+  google_cloud_options.temp_location = 'gs://my-bucket/temp'
+  options.view_as(StandardOptions).runner = 'DataflowPipelineRunner'
+
+  # Create the Pipeline with the specified options.
+  p = Pipeline(options=options)
+  # [END pipeline_options_dataflow_service]
+
+  my_options = options.view_as(MyOptions)
+  my_input = my_options.input
+  my_output = my_options.output
+
+  # Overriding the runner for tests.
+  options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
+  p = Pipeline(options=options)
+
+  lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(my_input))
+  lines | df.io.Write('WriteToText', df.io.TextFileSink(my_output))
+
+  p.run()
+
+
+def pipeline_options_local(argv):
+  """"Creating a Pipeline using a PipelineOptions object for local execution.
+
+  URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+  """
+
+  from google.cloud.dataflow import Pipeline
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  options = PipelineOptions(flags=argv)
+
+  # [START pipeline_options_define_custom_with_help_and_default]
+  class MyOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--input',
+                          help='Input for the dataflow pipeline',
+                          default='gs://my-bucket/input')
+      parser.add_argument('--output',
+                          help='Output for the dataflow pipeline',
+                          default='gs://my-bucket/output')
+  # [END pipeline_options_define_custom_with_help_and_default]
+
+  my_options = options.view_as(MyOptions)
+
+  my_input = my_options.input
+  my_output = my_options.output
+
+  # [START pipeline_options_local]
+  # Create and set your Pipeline Options.
+  options = PipelineOptions()
+  p = Pipeline(options=options)
+  # [END pipeline_options_local]
+
+  lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(my_input))
+  lines | df.io.Write('WriteToText', df.io.TextFileSink(my_output))
+  p.run()
+
+
+def pipeline_options_command_line(argv):
+  """Creating a Pipeline by passing a list of arguments.
+
+  URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+  """
+
+  # [START pipeline_options_command_line]
+  # Use Python argparse module to parse custom arguments
+  import argparse
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input')
+  parser.add_argument('--output')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  # Create the Pipeline with remaining arguments.
+  p = df.Pipeline(argv=pipeline_args)
+  lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(known_args.input))
+  lines | df.io.Write('WriteToText', df.io.TextFileSink(known_args.output))
+  # [END pipeline_options_command_line]
+
+  p.run()
+
+
+def pipeline_logging(lines, output):
+  """Logging Pipeline Messages.
+
+  URL: https://cloud.google.com/dataflow/pipelines/logging
+  """
+
+  import re
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  # [START pipeline_logging]
+  # import Python logging module.
+  import logging
+
+  class ExtractWordsFn(df.DoFn):
+
+    def process(self, context):
+      words = re.findall(r'[A-Za-z\']+', context.element)
+      for word in words:
+        yield word
+
+        if word.lower() == 'love':
+          # Log using the root logger at info or higher levels
+          logging.info('Found : %s', word.lower())
+
+  # Remaining WordCount example code ...
+  # [END pipeline_logging]
+
+  p = df.Pipeline(options=PipelineOptions())
+  (p
+   | df.Create(lines)
+   | df.ParDo('ExtractWords', ExtractWordsFn())
+   | df.io.Write('WriteToText', df.io.TextFileSink(output)))
+
+  p.run()
+
+
+def pipeline_monitoring(renames):
+  """Using monitoring interface snippets.
+
+  URL: https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf
+  """
+
+  import re
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  class WordCountOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--input',
+                          help='Input for the dataflow pipeline',
+                          default='gs://my-bucket/input')
+      parser.add_argument('--output',
+                          help='output for the dataflow pipeline',
+                          default='gs://my-bucket/output')
+
+  class ExtractWordsFn(df.DoFn):
+
+    def process(self, context):
+      words = re.findall(r'[A-Za-z\']+', context.element)
+      for word in words:
+        yield word
+
+  class FormatCountsFn(df.DoFn):
+
+    def process(self, context):
+      word, count = context.element
+      yield '%s: %s' % (word, count)
+
+  # [START pipeline_monitoring_composite]
+  # The CountWords Composite Transform inside the WordCount pipeline.
+  class CountWords(df.PTransform):
+
+    def apply(self, pcoll):
+      return (pcoll
+              # Convert lines of text into individual words.
+              | df.ParDo('ExtractWords', ExtractWordsFn())
+              # Count the number of times each word occurs.
+              | df.combiners.Count.PerElement()
+              # Format each word and count into a printable string.
+              | df.ParDo('FormatCounts', FormatCountsFn()))
+  # [END pipeline_monitoring_composite]
+
+  pipeline_options = PipelineOptions()
+  options = pipeline_options.view_as(WordCountOptions)
+  p = df.Pipeline(options=pipeline_options)
+
+  # [START pipeline_monitoring_execution]
+  (p
+   # Read the lines of the input text.
+   | df.io.Read('ReadLines', df.io.TextFileSource(options.input))
+   # Count the words.
+   | CountWords()
+   # Write the formatted word counts to output.
+   | df.io.Write('WriteCounts', df.io.TextFileSink(options.output)))
+  # [END pipeline_monitoring_execution]
+
+  p.visit(SnippetUtils.RenameFiles(renames))
+  p.run()
+
+
+def examples_wordcount_minimal(renames):
+  """MinimalWordCount example snippets.
+
+  URL:
+  https://cloud.google.com/dataflow/examples/wordcount-example#MinimalWordCount
+  """
+  import re
+
+  import google.cloud.dataflow as df
+
+  from google.cloud.dataflow.utils.options import GoogleCloudOptions
+  from google.cloud.dataflow.utils.options import StandardOptions
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  # [START examples_wordcount_minimal_options]
+  options = PipelineOptions()
+  google_cloud_options = options.view_as(GoogleCloudOptions)
+  google_cloud_options.project = 'my-project-id'
+  google_cloud_options.job_name = 'myjob'
+  google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
+  google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
+  options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
+  # [END examples_wordcount_minimal_options]
+
+  # Run it locally for testing.
+  options = PipelineOptions()
+
+  # [START examples_wordcount_minimal_create]
+  p = df.Pipeline(options=options)
+  # [END examples_wordcount_minimal_create]
+
+  (
+      # [START examples_wordcount_minimal_read]
+      p | df.io.Read(df.io.TextFileSource(
+          'gs://dataflow-samples/shakespeare/kinglear.txt'))
+      # [END examples_wordcount_minimal_read]
+
+      # [START examples_wordcount_minimal_pardo]
+      | df.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+      # [END examples_wordcount_minimal_pardo]
+
+      # [START examples_wordcount_minimal_count]
+      | df.combiners.Count.PerElement()
+      # [END examples_wordcount_minimal_count]
+
+      # [START examples_wordcount_minimal_map]
+      | df.Map(lambda (word, count): '%s: %s' % (word, count))
+      # [END examples_wordcount_minimal_map]
+
+      # [START examples_wordcount_minimal_write]
+      | df.io.Write(df.io.TextFileSink('gs://my-bucket/counts.txt'))
+      # [END examples_wordcount_minimal_write]
+  )
+
+  p.visit(SnippetUtils.RenameFiles(renames))
+
+  # [START examples_wordcount_minimal_run]
+  p.run()
+  # [END examples_wordcount_minimal_run]
+
+
+def examples_wordcount_wordcount(renames):
+  """WordCount example snippets.
+
+  URL:
+  https://cloud.google.com/dataflow/examples/wordcount-example#WordCount
+  """
+  import re
+
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  argv = []
+
+  # [START examples_wordcount_wordcount_options]
+  class WordCountOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--input',
+                          help='Input for the dataflow pipeline',
+                          default='gs://my-bucket/input')
+
+  options = PipelineOptions(argv)
+  p = df.Pipeline(options=options)
+  # [END examples_wordcount_wordcount_options]
+
+  lines = p | df.io.Read(df.io.TextFileSource(
+      'gs://dataflow-samples/shakespeare/kinglear.txt'))
+
+  # [START examples_wordcount_wordcount_composite]
+  class CountWords(df.PTransform):
+
+    def apply(self, pcoll):
+      return (pcoll
+              # Convert lines of text into individual words.
+              | df.FlatMap(
+                  'ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+
+              # Count the number of times each word occurs.
+              | df.combiners.Count.PerElement())
+
+  counts = lines | CountWords()
+  # [END examples_wordcount_wordcount_composite]
+
+  # [START examples_wordcount_wordcount_dofn]
+  class FormatAsTextFn(df.DoFn):
+
+    def process(self, context):
+      word, count = context.element
+      yield '%s: %s' % (word, count)
+
+  formatted = counts | df.ParDo(FormatAsTextFn())
+  # [END examples_wordcount_wordcount_dofn]
+
+  formatted | df.io.Write(df.io.TextFileSink('gs://my-bucket/counts.txt'))
+  p.visit(SnippetUtils.RenameFiles(renames))
+  p.run()
+
+
+def examples_wordcount_debugging(renames):
+  """DebuggingWordCount example snippets.
+
+  URL:
+  https://cloud.google.com/dataflow/examples/wordcount-example#DebuggingWordCount
+  """
+  import re
+
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  # [START example_wordcount_debugging_logging]
+  # [START example_wordcount_debugging_aggregators]
+  import logging
+
+  class FilterTextFn(df.DoFn):
+    """A DoFn that filters for a specific key based on a regular expression."""
+
+    # A custom aggregator can track values in your pipeline as it runs. Create
+    # custom aggregators matched_word and unmatched_words.
+    matched_words = df.Aggregator('matched_words')
+    umatched_words = df.Aggregator('umatched_words')
+
+    def __init__(self, pattern):
+      self.pattern = pattern
+
+    def process(self, context):
+      word, _ = context.element
+      if re.match(self.pattern, word):
+        # Log at INFO level each element we match. When executing this pipeline
+        # using the Dataflow service, these log lines will appear in the Cloud
+        # Logging UI.
+        logging.info('Matched %s', word)
+
+        # Add 1 to the custom aggregator matched_words
+        context.aggregate_to(self.matched_words, 1)
+        yield context.element
+      else:
+        # Log at the "DEBUG" level each element that is not matched. Different
+        # log levels can be used to control the verbosity of logging providing
+        # an effective mechanism to filter less important information. Note
+        # currently only "INFO" and higher level logs are emitted to the Cloud
+        # Logger. This log message will not be visible in the Cloud Logger.
+        logging.debug('Did not match %s', word)
+
+        # Add 1 to the custom aggregator umatched_words
+        context.aggregate_to(self.umatched_words, 1)
+  # [END example_wordcount_debugging_logging]
+  # [END example_wordcount_debugging_aggregators]
+
+  p = df.Pipeline(options=PipelineOptions())
+  filtered_words = (
+      p
+      | df.io.Read(df.io.TextFileSource(
+          'gs://dataflow-samples/shakespeare/kinglear.txt'))
+      | df.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+      | df.combiners.Count.PerElement()
+      | df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+
+  # [START example_wordcount_debugging_assert]
+  df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))
+  # [END example_wordcount_debugging_assert]
+
+  output = (filtered_words
+            | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
+            | df.io.Write(
+                'write', df.io.TextFileSink('gs://my-bucket/counts.txt')))
+
+  p.visit(SnippetUtils.RenameFiles(renames))
+  p.run()
+
+
+def model_textio(renames):
+  """Using a Read and Write transform to read/write text files.
+
+  URLs:
+    https://cloud.google.com/dataflow/model/pipeline-io
+    https://cloud.google.com/dataflow/model/text-io
+  """
+  def filter_words(x):
+    import re
+    return re.findall(r'[A-Za-z\']+', x)
+
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  # [START model_textio_read]
+  p = df.Pipeline(options=PipelineOptions())
+  # [START model_pipelineio_read]
+  lines = p | df.io.Read(
+      'ReadFromText',
+      df.io.TextFileSource('gs://my_bucket/path/to/input-*.csv'))
+  # [END model_pipelineio_read]
+  # [END model_textio_read]
+
+  # [START model_textio_write]
+  filtered_words = lines | df.FlatMap('FilterWords', filter_words)
+  # [START model_pipelineio_write]
+  filtered_words | df.io.Write(
+      'WriteToText', df.io.TextFileSink('gs://my_bucket/path/to/numbers',
+                                        file_name_suffix='.csv'))
+  # [END model_pipelineio_write]
+  # [END model_textio_write]
+
+  p.visit(SnippetUtils.RenameFiles(renames))
+  p.run()
+
+
+def model_bigqueryio():
+  """Using a Read and Write transform to read/write to BigQuery.
+
+  URL: https://cloud.google.com/dataflow/model/bigquery-io
+  """
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+
+  # [START model_bigqueryio_read]
+  p = df.Pipeline(options=PipelineOptions())
+  weather_data = p | df.io.Read(
+      'ReadWeatherStations',
+      df.io.BigQuerySource(
+          'clouddataflow-readonly:samples.weather_stations'))
+  # [END model_bigqueryio_read]
+
+  # [START model_bigqueryio_query]
+  p = df.Pipeline(options=PipelineOptions())
+  weather_data = p | df.io.Read(
+      'ReadYearAndTemp',
+      df.io.BigQuerySource(
+          query='SELECT year, mean_temp FROM samples.weather_stations'))
+  # [END model_bigqueryio_query]
+
+  # [START model_bigqueryio_schema]
+  schema = 'source:STRING, quote:STRING'
+  # [END model_bigqueryio_schema]
+
+  # [START model_bigqueryio_write]
+  quotes = p | df.Create(
+      [{'source': 'Mahatma Ghandi', 'quote': 'My life is my message.'}])
+  quotes | df.io.Write(
+      'Write', df.io.BigQuerySink(
+          'my-project:output.output_table',
+          schema=schema,
+          write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE,
+          create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED))
+  # [END model_bigqueryio_write]
+
+
+def model_composite_transform_example(contents, output_path):
+  """Example of a composite transform.
+
+  To declare a composite transform, define a subclass of PTransform.
+
+  To override the apply method, define a method "apply" that
+  takes a PCollection as its only parameter and returns a PCollection.
+
+  URL: https://cloud.google.com/dataflow/model/composite-transforms
+  """
+  import re
+
+  import google.cloud.dataflow as df
+
+  # [START composite_transform_example]
+  # [START composite_ptransform_apply_method]
+  # [START composite_ptransform_declare]
+  class CountWords(df.PTransform):
+    # [END composite_ptransform_declare]
+
+    def apply(self, pcoll):
+      return (pcoll
+              | df.FlatMap(lambda x: re.findall(r'\w+', x))
+              | df.combiners.Count.PerElement()
+              | df.Map(lambda (word, c): '%s: %s' % (word, c)))
+  # [END composite_ptransform_apply_method]
+  # [END composite_transform_example]
+
+  from google.cloud.dataflow.utils.options import PipelineOptions
+  p = df.Pipeline(options=PipelineOptions())
+  (p
+   | df.Create(contents)
+   | CountWords()
+   | df.io.Write(df.io.TextFileSink(output_path)))
+  p.run()
+
+
+def model_multiple_pcollections_flatten(contents, output_path):
+  """Merging a PCollection with Flatten.
+
+  URL: https://cloud.google.com/dataflow/model/multiple-pcollections
+  """
+  some_hash_fn = lambda s: ord(s[0])
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+  p = df.Pipeline(options=PipelineOptions())
+  partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
+
+  # Partition into deciles
+  partitioned = p | df.Create(contents) | df.Partition(partition_fn, 3)
+  pcoll1 = partitioned[0]
+  pcoll2 = partitioned[1]
+  pcoll3 = partitioned[2]
+
+  # Flatten them back into 1
+
+  # A collection of PCollection objects can be represented simply
+  # as a tuple (or list) of PCollections.
+  # (The SDK for Python has no separate type to store multiple
+  # PCollection objects, whether containing the same or different
+  # types.)
+  # [START model_multiple_pcollections_flatten]
+  merged = (
+      # [START model_multiple_pcollections_tuple]
+      (pcoll1, pcoll2, pcoll3)
+      # [END model_multiple_pcollections_tuple]
+      # A list of tuples can be "piped" directly into a Flatten transform.
+      | df.Flatten())
+  # [END model_multiple_pcollections_flatten]
+  merged | df.io.Write(df.io.TextFileSink(output_path))
+
+  p.run()
+
+
+def model_multiple_pcollections_partition(contents, output_path):
+  """Splitting a PCollection with Partition.
+
+  URL: https://cloud.google.com/dataflow/model/multiple-pcollections
+  """
+  some_hash_fn = lambda s: ord(s[0])
+  def get_percentile(i):
+    """Assume i in [0,100)."""
+    return i
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+  p = df.Pipeline(options=PipelineOptions())
+
+  students = p | df.Create(contents)
+  # [START model_multiple_pcollections_partition]
+  def partition_fn(student, num_partitions):
+    return int(get_percentile(student) * num_partitions / 100)
+
+  by_decile = students | df.Partition(partition_fn, 10)
+  # [END model_multiple_pcollections_partition]
+  # [START model_multiple_pcollections_partition_40th]
+  fortieth_percentile = by_decile[4]
+  # [END model_multiple_pcollections_partition_40th]
+
+  ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
+   | df.Flatten()
+   | df.io.Write(df.io.TextFileSink(output_path)))
+
+  p.run()
+
+
+def model_group_by_key(contents, output_path):
+  """Applying a GroupByKey Transform.
+
+  URL: https://cloud.google.com/dataflow/model/group-by-key
+  """
+  import re
+
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+  p = df.Pipeline(options=PipelineOptions())
+  words_and_counts = (
+      p
+      | df.Create(contents)
+      | df.FlatMap(lambda x: re.findall(r'\w+', x))
+      | df.Map('one word', lambda w: (w, 1)))
+  # GroupByKey accepts a PCollection of (w, 1) and
+  # outputs a PCollection of (w, (1, 1, ...)).
+  # (A key/value pair is just a tuple in Python.)
+  # This is a somewhat forced example, since one could
+  # simply use df.combiners.Count.PerElement here.
+  # [START model_group_by_key_transform]
+  grouped_words = words_and_counts | df.GroupByKey()
+  # [END model_group_by_key_transform]
+  (grouped_words
+   | df.Map('count words', lambda (word, counts): (word, len(counts)))
+   | df.io.Write(df.io.TextFileSink(output_path)))
+  p.run()
+
+
+def model_co_group_by_key_tuple(email_list, phone_list, output_path):
+  """Applying a CoGroupByKey Transform to a tuple.
+
+  URL: https://cloud.google.com/dataflow/model/group-by-key
+  """
+  import google.cloud.dataflow as df
+  from google.cloud.dataflow.utils.options import PipelineOptions
+  p = df.Pipeline(options=PipelineOptions())
+  # [START model_group_by_key_cogroupbykey_tuple]
+  # Each data set is represented by key-value pairs in separate PCollections.
+  # Both data sets share a common key type (in this example str).
+  # The email_list contains values such as: ('joe', 'joe@example.com') with
+  # multiple possible values for each key.
+  # The phone_list contains values such as: ('mary': '111-222-3333') with
+  # multiple possible values for each key.
+  emails = p | df.Create('email', email_list)
+  phones = p | df.Create('phone', phone_list)
+  # The result PCollection contains one key-value element for each key in the
+  # input PCollections. The key of the pair will be the key from the input and
+  # the value will be a dictionary with two entries: 'emails' - an iterable of
+  # all values for the current key in the emails PCollection and 'phones': an
+  # iterable of all values for the current key in the phones PCollection.
+  # For instance, if 'emails' contained ('joe', 'joe@example.com') and
+  # ('joe', 'joe@gmail.com'), then 'result' will contain the element
+  # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...})
+  result = {'emails': emails, 'phones': phones} | df.CoGroupByKey()
+
+  def join_info((name, info)):
+    return '; '.join(['%s' % name,
+                      '%s' % ','.join(info['emails']),
+                      '%s' % ','.join(info['phones'])])
+
+  contact_lines = result | df.Map(join_info)
+  # [END model_group_by_key_cogroupbykey_tuple]
+  contact_lines | df.io.Write(df.io.TextFileSink(output_path))
+  p.run()
+
+
+# [START model_library_transforms_keys]
+class Keys(df.PTransform):
+
+  def apply(self, pcoll):
+    return pcoll | df.Map('Keys', lambda (k, v): k)
+# [END model_library_transforms_keys]
+# pylint: enable=invalid-name
+
+
+# [START model_library_transforms_count]
+class Count(df.PTransform):
+
+  def apply(self, pcoll):
+    return (
+        pcoll
+        | df.Map('Init', lambda v: (v, 1))
+        | df.CombinePerKey(sum))
+# [END model_library_transforms_count]
+# pylint: enable=g-wrong-blank-lines

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
new file mode 100644
index 0000000..4c2014f
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -0,0 +1,560 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for all code snippets used in public docs."""
+
+import logging
+import sys
+import tempfile
+import unittest
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow import io
+from google.cloud.dataflow import pvalue
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.examples.snippets import snippets
+from google.cloud.dataflow.io import fileio
+from google.cloud.dataflow.utils.options import TypeOptions
+
+
+# Monky-patch to use native sink for file path re-writing.
+io.TextFileSink = fileio.NativeTextFileSink
+
+
+class ParDoTest(unittest.TestCase):
+  """Tests for dataflow/model/par-do."""
+
+  def test_pardo(self):
+    # Note: "words" and "ComputeWordLengthFn" are referenced by name in
+    # the text of the doc.
+
+    words = ['aa', 'bbb', 'c']
+    # [START model_pardo_pardo]
+    class ComputeWordLengthFn(df.DoFn):
+      def process(self, context):
+        return [len(context.element)]
+    # [END model_pardo_pardo]
+
+    # [START model_pardo_apply]
+    # Apply a ParDo to the PCollection "words" to compute lengths for each word.
+    word_lengths = words | df.ParDo(ComputeWordLengthFn())
+    # [END model_pardo_apply]
+    self.assertEqual({2, 3, 1}, set(word_lengths))
+
+  def test_pardo_yield(self):
+    words = ['aa', 'bbb', 'c']
+    # [START model_pardo_yield]
+    class ComputeWordLengthFn(df.DoFn):
+      def process(self, context):
+        yield len(context.element)
+    # [END model_pardo_yield]
+
+    word_lengths = words | df.ParDo(ComputeWordLengthFn())
+    self.assertEqual({2, 3, 1}, set(word_lengths))
+
+  def test_pardo_using_map(self):
+    words = ['aa', 'bbb', 'c']
+    # [START model_pardo_using_map]
+    word_lengths = words | df.Map(len)
+    # [END model_pardo_using_map]
+
+    self.assertEqual({2, 3, 1}, set(word_lengths))
+
+  def test_pardo_using_flatmap(self):
+    words = ['aa', 'bbb', 'c']
+    # [START model_pardo_using_flatmap]
+    word_lengths = words | df.FlatMap(lambda word: [len(word)])
+    # [END model_pardo_using_flatmap]
+
+    self.assertEqual({2, 3, 1}, set(word_lengths))
+
+  def test_pardo_using_flatmap_yield(self):
+    words = ['aA', 'bbb', 'C']
+    # [START model_pardo_using_flatmap_yield]
+    def capitals(word):
+      for letter in word:
+        if 'A' <= letter <= 'Z':
+            yield letter
+    all_capitals = words | df.FlatMap(capitals)
+    # [END model_pardo_using_flatmap_yield]
+
+    self.assertEqual({'A', 'C'}, set(all_capitals))
+
+  def test_pardo_with_label(self):
+    words = ['aa', 'bbc', 'defg']
+    # [START model_pardo_with_label]
+    result = words | df.Map('CountUniqueLetters', lambda word: len(set(word)))
+    # [END model_pardo_with_label]
+
+    self.assertEqual({1, 2, 4}, set(result))
+
+  def test_pardo_side_input(self):
+    p = df.Pipeline('DirectPipelineRunner')
+    words = p | df.Create('start', ['a', 'bb', 'ccc', 'dddd'])
+
+    # [START model_pardo_side_input]
+    # Callable takes additional arguments.
+    def filter_using_length(word, lower_bound, upper_bound=float('inf')):
+      if lower_bound <= len(word) <= upper_bound:
+        yield word
+
+    # Construct a deferred side input.
+    avg_word_len = words | df.Map(len) | df.CombineGlobally(df.combiners.MeanCombineFn())
+
+    # Call with explicit side inputs.
+    small_words = words | df.FlatMap('small', filter_using_length, 0, 3)
+
+    # A single deferred side input.
+    larger_than_average = words | df.FlatMap('large',
+                                             filter_using_length,
+                                             lower_bound=pvalue.AsSingleton(avg_word_len))
+
+    # Mix and match.
+    small_but_nontrivial = words | df.FlatMap(filter_using_length,
+                                              lower_bound=2,
+                                              upper_bound=pvalue.AsSingleton(avg_word_len))
+    # [END model_pardo_side_input]
+
+    df.assert_that(small_words, df.equal_to(['a', 'bb', 'ccc']))
+    df.assert_that(larger_than_average, df.equal_to(['ccc', 'dddd']),
+                   label='larger_than_average')
+    df.assert_that(small_but_nontrivial, df.equal_to(['bb']),
+                   label='small_but_not_trivial')
+    p.run()
+
+  def test_pardo_side_input_dofn(self):
+    words = ['a', 'bb', 'ccc', 'dddd']
+
+    # [START model_pardo_side_input_dofn]
+    class FilterUsingLength(df.DoFn):
+      def process(self, context, lower_bound, upper_bound=float('inf')):
+        if lower_bound <= len(context.element) <= upper_bound:
+          yield context.element
+
+    small_words = words | df.ParDo(FilterUsingLength(), 0, 3)
+    # [END model_pardo_side_input_dofn]
+    self.assertEqual({'a', 'bb', 'ccc'}, set(small_words))
+
+  def test_pardo_with_side_outputs(self):
+    # [START model_pardo_emitting_values_on_side_outputs]
+    class ProcessWords(df.DoFn):
+
+      def process(self, context, cutoff_length, marker):
+        if len(context.element) <= cutoff_length:
+          # Emit this short word to the main output.
+          yield context.element
+        else:
+          # Emit this word's long length to a side output.
+          yield pvalue.SideOutputValue(
+              'above_cutoff_lengths', len(context.element))
+        if context.element.startswith(marker):
+          # Emit this word to a different side output.
+          yield pvalue.SideOutputValue('marked strings', context.element)
+    # [END model_pardo_emitting_values_on_side_outputs]
+
+    words = ['a', 'an', 'the', 'music', 'xyz']
+
+    # [START model_pardo_with_side_outputs]
+    results = (words | df.ParDo(ProcessWords(), cutoff_length=2, marker='x')
+                         .with_outputs('above_cutoff_lengths',
+                                       'marked strings',
+                                       main='below_cutoff_strings'))
+    below = results.below_cutoff_strings
+    above = results.above_cutoff_lengths
+    marked = results['marked strings']  # indexing works as well
+    # [END model_pardo_with_side_outputs]
+
+    self.assertEqual({'a', 'an'}, set(below))
+    self.assertEqual({3, 5}, set(above))
+    self.assertEqual({'xyz'}, set(marked))
+
+    # [START model_pardo_with_side_outputs_iter]
+    below, above, marked = (words | df.ParDo(ProcessWords(), cutoff_length=2, marker='x')
+                                      .with_outputs('above_cutoff_lengths',
+                                                    'marked strings',
+                                                    main='below_cutoff_strings'))
+    # [END model_pardo_with_side_outputs_iter]
+
+    self.assertEqual({'a', 'an'}, set(below))
+    self.assertEqual({3, 5}, set(above))
+    self.assertEqual({'xyz'}, set(marked))
+
+  def test_pardo_with_undeclared_side_outputs(self):
+    numbers = [1, 2, 3, 4, 5, 10, 20]
+    # [START model_pardo_with_side_outputs_undeclared]
+    def even_odd(x):
+      yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
+      if x % 10 == 0:
+        yield x
+
+    results = numbers | df.FlatMap(even_odd).with_outputs()
+
+    evens = results.even
+    odds = results.odd
+    tens = results[None]  # the undeclared main output
+    # [END model_pardo_with_side_outputs_undeclared]
+
+    self.assertEqual({2, 4, 10, 20}, set(evens))
+    self.assertEqual({1, 3, 5}, set(odds))
+    self.assertEqual({10, 20}, set(tens))
+
+
+class TypeHintsTest(unittest.TestCase):
+
+  def test_bad_types(self):
+    p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
+
+    # [START type_hints_missing_define_numbers]
+    numbers = p | df.Create(['1', '2', '3'])
+    # [END type_hints_missing_define_numbers]
+
+    # Consider the following code.
+    # [START type_hints_missing_apply]
+    evens = numbers | df.Filter(lambda x: x % 2 == 0)
+    # [END type_hints_missing_apply]
+
+    # Now suppose numers was defined as [snippet above].
+    # When running this pipeline, you'd get a runtime error,
+    # possibly on a remote machine, possibly very late.
+
+    with self.assertRaises(TypeError):
+      p.run()
+
+    # To catch this early, we can assert what types we expect.
+    with self.assertRaises(typehints.TypeCheckError):
+      # [START type_hints_takes]
+      p.options.view_as(TypeOptions).pipeline_type_check = True
+      evens = numbers | df.Filter(lambda x: x % 2 == 0).with_input_types(int)
+      # [END type_hints_takes]
+
+    # Type hints can be declared on DoFns and callables as well, rather
+    # than where they're used, to be more self contained.
+    with self.assertRaises(typehints.TypeCheckError):
+      # [START type_hints_do_fn]
+      @df.typehints.with_input_types(int)
+      class FilterEvensDoFn(df.DoFn):
+        def process(self, context):
+          if context.element % 2 == 0:
+            yield context.element
+      evens = numbers | df.ParDo(FilterEvensDoFn())
+      # [END type_hints_do_fn]
+
+    words = p | df.Create('words', ['a', 'bb', 'c'])
+    # One can assert outputs and apply them to transforms as well.
+    # Helps document the contract and checks it at pipeline construction time.
+    # [START type_hints_transform]
+    T = df.typehints.TypeVariable('T')
+    @df.typehints.with_input_types(T)
+    @df.typehints.with_output_types(df.typehints.Tuple[int, T])
+    class MyTransform(df.PTransform):
+      def apply(self, pcoll):
+        return pcoll | df.Map(lambda x: (len(x), x))
+
+    words_with_lens = words | MyTransform()
+    # [END type_hints_transform]
+
+    with self.assertRaises(typehints.TypeCheckError):
+      words_with_lens | df.Map(lambda x: x).with_input_types(
+          df.typehints.Tuple[int, int])
+
+  def test_runtime_checks_off(self):
+    p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
+    # [START type_hints_runtime_off]
+    p | df.Create(['a']) | df.Map(lambda x: 3).with_output_types(str)
+    p.run()
+    # [END type_hints_runtime_off]
+
+  def test_runtime_checks_on(self):
+    p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
+    with self.assertRaises(typehints.TypeCheckError):
+      # [START type_hints_runtime_on]
+      p.options.view_as(TypeOptions).runtime_type_check = True
+      p | df.Create(['a']) | df.Map(lambda x: 3).with_output_types(str)
+      p.run()
+      # [END type_hints_runtime_on]
+
+  def test_deterministic_key(self):
+    p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
+    lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']
+
+    # [START type_hints_deterministic_key]
+    class Player(object):
+      def __init__(self, team, name):
+        self.team = team
+        self.name = name
+
+    class PlayerCoder(df.coders.Coder):
+      def encode(self, player):
+        return '%s:%s' % (player.team, player.name)
+
+      def decode(self, s):
+        return Player(*s.split(':'))
+
+      def is_deterministic(self):
+        return True
+
+    df.coders.registry.register_coder(Player, PlayerCoder)
+
+    def parse_player_and_score(csv):
+      name, team, score = csv.split(',')
+      return Player(team, name), int(score)
+
+    totals = (
+        lines
+        | df.Map(parse_player_and_score)
+        | df.CombinePerKey(sum).with_input_types(df.typehints.Tuple[Player, int]))
+    # [END type_hints_deterministic_key]
+
+    self.assertEquals(
+        {('banana', 3), ('kiwi', 4), ('zucchini', 3)},
+        set(totals | df.Map(lambda (k, v): (k.name, v))))
+
+
+class SnippetsTest(unittest.TestCase):
+
+  def create_temp_file(self, contents=''):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def get_output(self, path, sorted_output=True, suffix=''):
+    with open(path + '-00000-of-00001' + suffix) as f:
+      lines = f.readlines()
+    if sorted_output:
+      return sorted(s.rstrip('\n') for s in lines)
+    else:
+      return [s.rstrip('\n') for s in lines]
+
+  def test_model_pipelines(self):
+    temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
+    result_path = temp_path + '.result'
+    snippets.model_pipelines([
+        '--input=%s*' % temp_path,
+        '--output=%s' % result_path])
+    self.assertEqual(
+        self.get_output(result_path),
+        [str(s) for s in [(u'aa', 1), (u'bb', 2), (u'cc', 3)]])
+
+  def test_model_pcollection(self):
+    temp_path = self.create_temp_file()
+    snippets.model_pcollection(['--output=%s' % temp_path])
+    self.assertEqual(self.get_output(temp_path, sorted_output=False), [
+        'To be, or not to be: that is the question: ',
+        'Whether \'tis nobler in the mind to suffer ',
+        'The slings and arrows of outrageous fortune, ',
+        'Or to take arms against a sea of troubles, '])
+
+  def test_construct_pipeline(self):
+    temp_path = self.create_temp_file(
+        'abc def ghi\n jkl mno pqr\n stu vwx yz')
+    result_path = self.create_temp_file()
+    snippets.construct_pipeline({'read': temp_path, 'write': result_path})
+    self.assertEqual(
+        self.get_output(result_path),
+        ['cba', 'fed', 'ihg', 'lkj', 'onm', 'rqp', 'uts', 'xwv', 'zy'])
+
+  def test_model_textio(self):
+    temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
+    result_path = temp_path + '.result'
+    snippets.model_textio({'read': temp_path, 'write': result_path})
+    self.assertEqual(
+        ['aa', 'bb', 'bb', 'cc', 'cc', 'cc'],
+        self.get_output(result_path, suffix='.csv'))
+
+  def test_model_bigqueryio(self):
+    # We cannot test BigQueryIO functionality in unit tests therefore we limit
+    # ourselves to making sure the pipeline containing BigQuery sources and
+    # sinks can be built.
+    self.assertEqual(None, snippets.model_bigqueryio())
+
+  def _run_test_pipeline_for_options(self, fn):
+    temp_path = self.create_temp_file('aa\nbb\ncc')
+    result_path = temp_path + '.result'
+    fn([
+        '--input=%s*' % temp_path,
+        '--output=%s' % result_path])
+    self.assertEqual(
+        ['aa', 'bb', 'cc'],
+        self.get_output(result_path))
+
+  def test_pipeline_options_local(self):
+    self._run_test_pipeline_for_options(snippets.pipeline_options_local)
+
+  def test_pipeline_options_remote(self):
+    self._run_test_pipeline_for_options(snippets.pipeline_options_remote)
+
+  def test_pipeline_options_command_line(self):
+    self._run_test_pipeline_for_options(snippets.pipeline_options_command_line)
+
+  def test_pipeline_logging(self):
+    result_path = self.create_temp_file()
+    lines = ['we found love right where we are',
+             'we found love right from the start',
+             'we found love in a hopeless place']
+    snippets.pipeline_logging(lines, result_path)
+    self.assertEqual(
+        sorted(' '.join(lines).split(' ')),
+        self.get_output(result_path))
+
+  def test_examples_wordcount(self):
+    pipelines = [snippets.examples_wordcount_minimal,
+                 snippets.examples_wordcount_wordcount,
+                 snippets.pipeline_monitoring]
+
+    for pipeline in pipelines:
+      temp_path = self.create_temp_file(
+          'abc def ghi\n abc jkl')
+      result_path = self.create_temp_file()
+      pipeline({'read': temp_path, 'write': result_path})
+      self.assertEqual(
+          self.get_output(result_path),
+          ['abc: 2', 'def: 1', 'ghi: 1', 'jkl: 1'])
+
+  def test_examples_wordcount_debugging(self):
+    temp_path = self.create_temp_file(
+        'Flourish Flourish Flourish stomach abc def')
+    result_path = self.create_temp_file()
+    snippets.examples_wordcount_debugging(
+        {'read': temp_path, 'write': result_path})
+    self.assertEqual(
+        self.get_output(result_path),
+        ['Flourish: 3', 'stomach: 1'])
+
+  def test_model_composite_transform_example(self):
+    contents = ['aa bb cc', 'bb cc', 'cc']
+    result_path = self.create_temp_file()
+    snippets.model_composite_transform_example(contents, result_path)
+    self.assertEqual(['aa: 1', 'bb: 2', 'cc: 3'], self.get_output(result_path))
+
+  def test_model_multiple_pcollections_flatten(self):
+    contents = ['a', 'b', 'c', 'd', 'e', 'f']
+    result_path = self.create_temp_file()
+    snippets.model_multiple_pcollections_flatten(contents, result_path)
+    self.assertEqual(contents, self.get_output(result_path))
+
+  def test_model_multiple_pcollections_partition(self):
+    contents = [17, 42, 64, 32, 0, 99, 53, 89]
+    result_path = self.create_temp_file()
+    snippets.model_multiple_pcollections_partition(contents, result_path)
+    self.assertEqual(['0', '17', '32', '42', '53', '64', '89', '99'],
+                     self.get_output(result_path))
+
+  def test_model_group_by_key(self):
+    contents = ['a bb ccc bb bb a']
+    result_path = self.create_temp_file()
+    snippets.model_group_by_key(contents, result_path)
+    expected = [('a', 2), ('bb', 3), ('ccc', 1)]
+    self.assertEqual([str(s) for s in expected], self.get_output(result_path))
+
+  def test_model_co_group_by_key_tuple(self):
+    email_list = [['a', 'a@example.com'], ['b', 'b@example.com']]
+    phone_list = [['a', 'x4312'], ['b', 'x8452']]
+    result_path = self.create_temp_file()
+    snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path)
+    expect = ['a; a@example.com; x4312', 'b; b@example.com; x8452']
+    self.assertEqual(expect, self.get_output(result_path))
+
+
+class CombineTest(unittest.TestCase):
+  """Tests for dataflow/model/combine."""
+
+  def test_global_sum(self):
+    pc = [1, 2, 3]
+    # [START global_sum]
+    result = pc | df.CombineGlobally(sum)
+    # [END global_sum]
+    self.assertEqual([6], result)
+
+  def test_combine_values(self):
+    occurences = [('cat', 1), ('cat', 5), ('cat', 9), ('dog', 5), ('dog', 2)]
+    # [START combine_values]
+    first_occurences = occurences | df.GroupByKey() | df.CombineValues(min)
+    # [END combine_values]
+    self.assertEqual({('cat', 1), ('dog', 2)}, set(first_occurences))
+
+  def test_combine_per_key(self):
+    player_accuracies = [
+        ('cat', 1), ('cat', 5), ('cat', 9), ('cat', 1),
+        ('dog', 5), ('dog', 2)]
+    # [START combine_per_key]
+    avg_accuracy_per_player = player_accuracies | df.CombinePerKey(df.combiners.MeanCombineFn())
+    # [END combine_per_key]
+    self.assertEqual({('cat', 4.0), ('dog', 3.5)}, set(avg_accuracy_per_player))
+
+  def test_combine_concat(self):
+    pc = ['a', 'b']
+    # [START combine_concat]
+    def concat(values, separator=', '):
+      return separator.join(values)
+    with_commas = pc | df.CombineGlobally(concat)
+    with_dashes = pc | df.CombineGlobally(concat, separator='-')
+    # [END combine_concat]
+    self.assertEqual(1, len(with_commas))
+    self.assertTrue(with_commas[0] in {'a, b', 'b, a'})
+    self.assertEqual(1, len(with_dashes))
+    self.assertTrue(with_dashes[0] in {'a-b', 'b-a'})
+
+  def test_bounded_sum(self):
+    # [START combine_bounded_sum]
+    pc = [1, 10, 100, 1000]
+    def bounded_sum(values, bound=500):
+      return min(sum(values), bound)
+    small_sum = pc | df.CombineGlobally(bounded_sum)              # [500]
+    large_sum = pc | df.CombineGlobally(bounded_sum, bound=5000)  # [1111]
+    # [END combine_bounded_sum]
+    self.assertEqual([500], small_sum)
+    self.assertEqual([1111], large_sum)
+
+  def test_combine_reduce(self):
+    factors = [2, 3, 5, 7]
+    # [START combine_reduce]
+    import functools
+    import operator
+    product = factors | df.CombineGlobally(functools.partial(reduce, operator.mul), 1)
+    # [END combine_reduce]
+    self.assertEqual([210], product)
+
+  def test_custom_average(self):
+    pc = [2, 3, 5, 7]
+
+
+    # [START combine_custom_average]
+    class AverageFn(df.CombineFn):
+      def create_accumulator(self):
+        return (0.0, 0)
+      def add_input(self, (sum, count), input):
+        return sum + input, count + 1
+      def merge_accumulators(self, accumulators):
+        sums, counts = zip(*accumulators)
+        return sum(sums), sum(counts)
+      def extract_output(self, (sum, count)):
+        return sum / count if count else float('NaN')
+    average = pc | df.CombineGlobally(AverageFn())
+    # [END combine_custom_average]
+    self.assertEqual([4.25], average)
+
+  def test_keys(self):
+    occurrences = [('cat', 1), ('cat', 5), ('dog', 5), ('cat', 9), ('dog', 2)]
+    unique_keys = occurrences | snippets.Keys()
+    self.assertEqual({'cat', 'dog'}, set(unique_keys))
+
+  def test_count(self):
+    occurrences = ['cat', 'dog', 'cat', 'cat', 'dog']
+    perkey_counts = occurrences | snippets.Count()
+    self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
new file mode 100644
index 0000000..67efb96
--- /dev/null
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -0,0 +1,61 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A streaming string-capitalization workflow.
+
+Important: streaming pipeline support in Python Dataflow is in development
+and is not yet available for use.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None):
+  """Build and run the pipeline."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input_topic', dest='input_topic', required=True,
+      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+  parser.add_argument(
+      '--output_topic', dest='output_topic', required=True,
+      help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Read the text file[pattern] into a PCollection.
+  lines = p | df.io.Read(
+      'read', df.io.PubSubSource(known_args.input_topic))
+
+  # Capitalize the characters in each line.
+  transformed = (lines
+                 | (df.Map('capitalize', lambda x: x.upper())))
+
+  # Write to PubSub.
+  # pylint: disable=expression-not-assigned
+  transformed | df.io.Write(
+      'pubsub_write', df.io.PubSubSink(known_args.output_topic))
+
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
new file mode 100644
index 0000000..210d301
--- /dev/null
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -0,0 +1,71 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A streaming word-counting workflow.
+
+Important: streaming pipeline support in Python Dataflow is in development
+and is not yet available for use.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+
+import google.cloud.dataflow as df
+import google.cloud.dataflow.transforms.window as window
+
+
+def run(argv=None):
+  """Build and run the pipeline."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input_topic', required=True,
+      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+  parser.add_argument(
+      '--output_topic', required=True,
+      help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Read the text file[pattern] into a PCollection.
+  lines = p | df.io.Read(
+      'read', df.io.PubSubSource(known_args.input_topic))
+
+  # Capitalize the characters in each line.
+  transformed = (lines
+                 | (df.FlatMap('split',
+                               lambda x: re.findall(r'[A-Za-z\']+', x))
+                    .with_output_types(unicode))
+                 | df.Map('pair_with_one', lambda x: (x, 1))
+                 | df.WindowInto(window.FixedWindows(15, 0))
+                 | df.GroupByKey('group')
+                 | df.Map('count', lambda (word, ones): (word, sum(ones)))
+                 | df.Map('format', lambda tup: '%s: %d' % tup))
+
+  # Write to PubSub.
+  # pylint: disable=expression-not-assigned
+  transformed | df.io.Write(
+      'pubsub_write', df.io.PubSubSink(known_args.output_topic))
+
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
new file mode 100644
index 0000000..cf87268
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -0,0 +1,99 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+empty_line_aggregator = df.Aggregator('emptyLines')
+average_word_size_aggregator = df.Aggregator('averageWordLength',
+                                             df.combiners.MeanCombineFn(),
+                                             float)
+
+
+class WordExtractingDoFn(df.DoFn):
+  """Parse each line of input text into words."""
+
+  def process(self, context):
+    """Returns an iterator over the words of this element.
+
+    The element is a line of text.  If the line is blank, note that, too.
+
+    Args:
+      context: the call-specific context: data and aggregator.
+
+    Returns:
+      The processed element.
+    """
+    text_line = context.element.strip()
+    if not text_line:
+      context.aggregate_to(empty_line_aggregator, 1)
+    words = re.findall(r'[A-Za-z\']+', text_line)
+    for w in words:
+      context.aggregate_to(average_word_size_aggregator, len(w))
+    return words
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      dest='output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Read the text file[pattern] into a PCollection.
+  lines = p | df.io.Read('read', df.io.TextFileSource(known_args.input))
+
+  # Count the occurrences of each word.
+  counts = (lines
+            | (df.ParDo('split', WordExtractingDoFn())
+               .with_output_types(unicode))
+            | df.Map('pair_with_one', lambda x: (x, 1))
+            | df.GroupByKey('group')
+            | df.Map('count', lambda (word, ones): (word, sum(ones))))
+
+  # Format the counts into a PCollection of strings.
+  output = counts | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
+
+  # Write the output using a "Write" transform that has side effects.
+  # pylint: disable=expression-not-assigned
+  output | df.io.Write('write', df.io.TextFileSink(known_args.output))
+
+  # Actually run the pipeline (all operations above are deferred).
+  result = p.run()
+  empty_line_values = result.aggregated_values(empty_line_aggregator)
+  logging.info('number of empty lines: %d', sum(empty_line_values.values()))
+  word_length_values = result.aggregated_values(average_word_size_aggregator)
+  logging.info('average word lengths: %s', word_length_values.values())
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
new file mode 100644
index 0000000..66d4eb1
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -0,0 +1,154 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""An example that verifies the counts and includes Dataflow best practices.
+
+On top of the basic concepts in the wordcount example, this workflow introduces
+logging to Cloud Logging, and using assertions in a Dataflow pipeline.
+
+To execute this pipeline locally, specify a local output file or output prefix
+on GCS::
+
+  --output [YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+
+To execute this pipeline using the Google Cloud Dataflow service, specify
+pipeline configuration::
+
+  --project YOUR_PROJECT_ID
+  --staging_location gs://YOUR_STAGING_DIRECTORY
+  --temp_location gs://YOUR_TEMP_DIRECTORY
+  --job_name YOUR_JOB_NAME
+  --runner BlockingDataflowPipelineRunner
+
+and an output prefix on GCS::
+
+  --output gs://YOUR_OUTPUT_PREFIX
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+class FilterTextFn(df.DoFn):
+  """A DoFn that filters for a specific key based on a regular expression."""
+
+  # A custom aggregator can track values in your pipeline as it runs. Those
+  # values will be displayed in the Dataflow Monitoring UI when this pipeline is
+  # run using the Dataflow service. These aggregators below track the number of
+  # matched and unmatched words. Learn more at
+  # https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
+  # the Dataflow Monitoring UI.
+  matched_words = df.Aggregator('matched_words')
+  umatched_words = df.Aggregator('umatched_words')
+
+  def __init__(self, pattern):
+    super(FilterTextFn, self).__init__()
+    self.pattern = pattern
+
+  def process(self, context):
+    word, _ = context.element
+    if re.match(self.pattern, word):
+      # Log at INFO level each element we match. When executing this pipeline
+      # using the Dataflow service, these log lines will appear in the Cloud
+      # Logging UI.
+      logging.info('Matched %s', word)
+      context.aggregate_to(self.matched_words, 1)
+      yield context.element
+    else:
+      # Log at the "DEBUG" level each element that is not matched. Different log
+      # levels can be used to control the verbosity of logging providing an
+      # effective mechanism to filter less important information.
+      # Note currently only "INFO" and higher level logs are emitted to the
+      # Cloud Logger. This log message will not be visible in the Cloud Logger.
+      logging.debug('Did not match %s', word)
+      context.aggregate_to(self.umatched_words, 1)
+
+
+class CountWords(df.PTransform):
+  """A transform to count the occurrences of each word.
+
+  A PTransform that converts a PCollection containing lines of text into a
+  PCollection of (word, count) tuples.
+  """
+
+  def __init__(self):
+    super(CountWords, self).__init__()
+
+  def apply(self, pcoll):
+    return (pcoll
+            | (df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+               .with_output_types(unicode))
+            | df.Map('pair_with_one', lambda x: (x, 1))
+            | df.GroupByKey('group')
+            | df.Map('count', lambda (word, ones): (word, sum(ones))))
+
+
+def run(argv=None):
+  """Runs the debugging wordcount pipeline."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      dest='output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Read the text file[pattern] into a PCollection, count the occurrences of
+  # each word and filter by a list of words.
+  filtered_words = (
+      p | df.io.Read('read', df.io.TextFileSource(known_args.input))
+      | CountWords() | df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+
+  # assert_that is a convenient PTransform that checks a PCollection has an
+  # expected value. Asserts are best used in unit tests with small data sets but
+  # is demonstrated here as a teaching tool.
+  #
+  # Note assert_that does not provide any output and that successful completion
+  # of the Pipeline implies that the expectations were  met. Learn more at
+  # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
+  # test your pipeline.
+  df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))
+
+  # Format the counts into a PCollection of strings and write the output using a
+  # "Write" transform that has side effects.
+  # pylint: disable=unused-variable
+  output = (filtered_words
+            | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
+            | df.io.Write('write', df.io.TextFileSink(known_args.output)))
+
+  # Actually run the pipeline (all operations above are deferred).
+  p.run()
+
+
+if __name__ == '__main__':
+  # Cloud Logging would contain only logging.INFO and higher level logs logged
+  # by the root logger. All log statements emitted by the root logger will be
+  # visible in the Cloud Logging UI. Learn more at
+  # https://cloud.google.com/logging about the Cloud Logging UI.
+  #
+  # You can set the default logging level to a different level when running
+  # locally.
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_debugging_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging_test.py b/sdks/python/apache_beam/examples/wordcount_debugging_test.py
new file mode 100644
index 0000000..aa517d6
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_debugging_test.py
@@ -0,0 +1,56 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the debugging wordcount example."""
+
+import logging
+import re
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples import wordcount_debugging
+
+
+class WordCountTest(unittest.TestCase):
+
+  SAMPLE_TEXT = 'xx yy Flourish\n zz Flourish Flourish stomach\n aa\n bb cc dd'
+
+  def create_temp_file(self, contents):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def get_results(self, temp_path):
+    results = []
+    with open(temp_path + '.result-00000-of-00001') as result_file:
+      for line in result_file:
+        match = re.search(r'([A-Za-z]+): ([0-9]+)', line)
+        if match is not None:
+          results.append((match.group(1), int(match.group(2))))
+    return results
+
+  def test_basics(self):
+    temp_path = self.create_temp_file(self.SAMPLE_TEXT)
+    expected_words = [('Flourish', 3), ('stomach', 1)]
+    wordcount_debugging.run([
+        '--input=%s*' % temp_path,
+        '--output=%s.result' % temp_path])
+
+    # Parse result file and compare.
+    results = self.get_results(temp_path)
+    self.assertEqual(sorted(results), sorted(expected_words))
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
new file mode 100644
index 0000000..69f3986
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -0,0 +1,111 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A minimalist word-counting workflow that counts words in Shakespeare.
+
+This is the first in a series of successively more detailed 'word count'
+examples.
+
+Next, see the wordcount pipeline, then the wordcount_debugging pipeline, for
+more detailed examples that introduce additional concepts.
+
+Concepts:
+
+1. Reading data from text files
+2. Specifying 'inline' transforms
+3. Counting a PCollection
+4. Writing data to Cloud Storage as text files
+
+To execute this pipeline locally, first edit the code to specify the output
+location. Output location could be a local file path or an output prefix
+on GCS. (Only update the output location marked with the first CHANGE comment.)
+
+To execute this pipeline remotely, first edit the code to set your project ID,
+runner type, the staging location, the temp location, and the output location.
+The specified GCS bucket(s) must already exist. (Update all the places marked
+with a CHANGE comment.)
+
+Then, run the pipeline as described in the README. It will be deployed and run
+using the Google Cloud Dataflow Service. No args are required to run the
+pipeline. You can see the results in your output bucket in the GCS browser.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      dest='output',
+                      # CHANGE 1/5: The Google Cloud Storage path is required
+                      # for outputting the results.
+                      default='gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX',
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_args.extend([
+      # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowPipelineRunner to
+      # run your pipeline on the Google Cloud Dataflow Service.
+      '--runner=DirectPipelineRunner',
+      # CHANGE 3/5: Your project ID is required in order to run your pipeline on
+      # the Google Cloud Dataflow Service.
+      '--project=SET_YOUR_PROJECT_ID_HERE',
+      # CHANGE 4/5: Your Google Cloud Storage path is required for staging local
+      # files.
+      '--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY',
+      # CHANGE 5/5: Your Google Cloud Storage path is required for temporary
+      # files.
+      '--temp_location=gs://YOUR_BUCKET_NAME/AND_TEMP_DIRECTORY',
+      '--job_name=your-wordcount-job',
+  ])
+
+  p = df.Pipeline(argv=pipeline_args)
+
+  # Read the text file[pattern] into a PCollection.
+  lines = p | df.io.Read('read', df.io.TextFileSource(known_args.input))
+
+  # Count the occurrences of each word.
+  counts = (lines
+            | (df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+               .with_output_types(unicode))
+            | df.Map('pair_with_one', lambda x: (x, 1))
+            | df.GroupByKey('group')
+            | df.Map('count', lambda (word, ones): (word, sum(ones))))
+
+  # Format the counts into a PCollection of strings.
+  output = counts | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
+
+  # Write the output using a "Write" transform that has side effects.
+  # pylint: disable=expression-not-assigned
+  output | df.io.Write('write', df.io.TextFileSink(known_args.output))
+
+  # Actually run the pipeline (all operations above are deferred).
+  p.run()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_minimal_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal_test.py b/sdks/python/apache_beam/examples/wordcount_minimal_test.py
new file mode 100644
index 0000000..f110c21
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_minimal_test.py
@@ -0,0 +1,56 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the minimal wordcount example."""
+
+import collections
+import logging
+import re
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples import wordcount_minimal
+
+
+class WordCountMinimalTest(unittest.TestCase):
+  """Unit test for wordcount_minimal example with direct runner."""
+
+  SAMPLE_TEXT = 'a b c a b a\n aa bb cc aa bb aa'
+
+  def create_temp_file(self, contents):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def test_basics(self):
+    temp_path = self.create_temp_file(self.SAMPLE_TEXT)
+    expected_words = collections.defaultdict(int)
+    for word in re.findall(r'\w+', self.SAMPLE_TEXT):
+      expected_words[word] += 1
+    wordcount_minimal.run([
+        '--input=%s*' % temp_path,
+        '--output=%s.result' % temp_path])
+    # Parse result file and compare.
+    results = []
+    with open(temp_path + '.result-00000-of-00001') as result_file:
+      for line in result_file:
+        match = re.search(r'([a-z]+): ([0-9]+)', line)
+        if match is not None:
+          results.append((match.group(1), int(match.group(2))))
+    self.assertEqual(sorted(results), sorted(expected_words.iteritems()))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_test.py b/sdks/python/apache_beam/examples/wordcount_test.py
new file mode 100644
index 0000000..72b1e32
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_test.py
@@ -0,0 +1,55 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the wordcount example."""
+
+import collections
+import logging
+import re
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples import wordcount
+
+
+class WordCountTest(unittest.TestCase):
+
+  SAMPLE_TEXT = 'a b c a b a\n\n aa bb cc aa bb aa'
+
+  def create_temp_file(self, contents):
+    with tempfile.NamedTemporaryFile(delete=False) as f:
+      f.write(contents)
+      return f.name
+
+  def test_basics(self):
+    temp_path = self.create_temp_file(self.SAMPLE_TEXT)
+    expected_words = collections.defaultdict(int)
+    for word in re.findall(r'\w+', self.SAMPLE_TEXT):
+      expected_words[word] += 1
+    wordcount.run([
+        '--input=%s*' % temp_path,
+        '--output=%s.result' % temp_path])
+    # Parse result file and compare.
+    results = []
+    with open(temp_path + '.result-00000-of-00001') as result_file:
+      for line in result_file:
+        match = re.search(r'([a-z]+): ([0-9]+)', line)
+        if match is not None:
+          results.append((match.group(1), int(match.group(2))))
+    self.assertEqual(sorted(results), sorted(expected_words.iteritems()))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/internal/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/__init__.py b/sdks/python/apache_beam/internal/__init__.py
new file mode 100644
index 0000000..e69de29