You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:21 UTC
[46/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
new file mode 100644
index 0000000..f6bb63a
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -0,0 +1,872 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Code snippets used in Cloud Dataflow webdocs.
+
+The examples here are written specifically to read well with the accompanying
+web docs from https://cloud.google.com/dataflow. Do not rewrite them until you
+make sure the webdocs still read well and the rewritten code supports the
+concept being described. For example, there are snippets that could be shorter
+but they are written like this to make a specific point in the docs.
+
+The code snippets are all organized as self contained functions. Parts of the
+function body delimited by [START tag] and [END tag] will be included
+automatically in the web docs. The naming convention for the tags is to have as
+prefix the PATH_TO_HTML where they are included followed by a descriptive
+string. For instance a code snippet that will be used as a code example
+at https://cloud.google.com/dataflow/model/pipelines will have the tag
+model_pipelines_DESCRIPTION. The tags can contain only letters, digits and _.
+"""
+
+import google.cloud.dataflow as df
+
+# Quiet some pylint warnings that happen because of the somewhat special
+# format for the code snippets.
+# pylint:disable=invalid-name
+# pylint:disable=expression-not-assigned
+# pylint:disable=redefined-outer-name
+# pylint:disable=unused-variable
+# pylint:disable=g-doc-args
+# pylint:disable=g-import-not-at-top
+
+
+class SnippetUtils(object):
+ from google.cloud.dataflow.pipeline import PipelineVisitor
+
+ class RenameFiles(PipelineVisitor):
+ """RenameFiles will rewire source and sink for unit testing.
+
+ RenameFiles will rewire the GCS files specified in the source and
+ sink in the snippet pipeline to local files so the pipeline can be run as a
+ unit test. This is as close as we can get to have code snippets that are
+ executed and are also ready to presented in webdocs.
+ """
+
+ def __init__(self, renames):
+ self.renames = renames
+
+ def visit_transform(self, transform_node):
+ if hasattr(transform_node.transform, 'source'):
+ source = transform_node.transform.source
+ source.file_path = self.renames['read']
+ source.is_gcs_source = False
+ elif hasattr(transform_node.transform, 'sink'):
+ sink = transform_node.transform.sink
+ sink.file_path = self.renames['write']
+ sink.is_gcs_sink = False
+
+
+def construct_pipeline(renames):
+ """A reverse words snippet as an example for constructing a pipeline.
+
+ URL: https://cloud.google.com/dataflow/pipelines/constructing-your-pipeline
+ """
+ import re
+
+ class ReverseWords(df.PTransform):
+ """A PTransform that reverses individual elements in a PCollection."""
+
+ def apply(self, pcoll):
+ return pcoll | df.Map(lambda e: e[::-1])
+
+ def filter_words(unused_x):
+ """Pass through filter to select everything."""
+ return True
+
+ # [START pipelines_constructing_creating]
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ p = df.Pipeline(options=PipelineOptions())
+ # [END pipelines_constructing_creating]
+
+ # [START pipelines_constructing_reading]
+ lines = p | df.io.Read('ReadMyFile',
+ df.io.TextFileSource('gs://some/inputData.txt'))
+ # [END pipelines_constructing_reading]
+
+ # [START pipelines_constructing_applying]
+ words = lines | df.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ reversed_words = words | ReverseWords()
+ # [END pipelines_constructing_applying]
+
+ # [START pipelines_constructing_writing]
+ filtered_words = reversed_words | df.Filter('FilterWords', filter_words)
+ filtered_words | df.io.Write('WriteMyFile',
+ df.io.TextFileSink('gs://some/outputData.txt'))
+ # [END pipelines_constructing_writing]
+
+ p.visit(SnippetUtils.RenameFiles(renames))
+
+ # [START pipelines_constructing_running]
+ p.run()
+ # [END pipelines_constructing_running]
+
+
+def model_pipelines(argv):
+ """A wordcount snippet as a simple pipeline example.
+
+ URL: https://cloud.google.com/dataflow/model/pipelines
+ """
+ # [START model_pipelines]
+ import re
+
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ class MyOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear'
+ '.txt',
+ help='Input file to process.')
+ parser.add_argument('--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+
+ pipeline_options = PipelineOptions(argv)
+ my_options = pipeline_options.view_as(MyOptions)
+
+ p = df.Pipeline(options=pipeline_options)
+
+ (p
+ | df.io.Read(df.io.TextFileSource(my_options.input))
+ | df.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ | df.Map(lambda x: (x, 1)) | df.combiners.Count.PerKey()
+ | df.io.Write(df.io.TextFileSink(my_options.output)))
+
+ p.run()
+ # [END model_pipelines]
+
+
+def model_pcollection(argv):
+ """Creating a PCollection from data in local memory.
+
+ URL: https://cloud.google.com/dataflow/model/pcollection
+ """
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ class MyOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+
+ pipeline_options = PipelineOptions(argv)
+ my_options = pipeline_options.view_as(MyOptions)
+
+ # [START model_pcollection]
+ p = df.Pipeline(options=pipeline_options)
+
+ (p
+ | df.Create([
+ 'To be, or not to be: that is the question: ',
+ 'Whether \'tis nobler in the mind to suffer ',
+ 'The slings and arrows of outrageous fortune, ',
+ 'Or to take arms against a sea of troubles, '])
+ | df.io.Write(df.io.TextFileSink(my_options.output)))
+
+ p.run()
+ # [END model_pcollection]
+
+
+def pipeline_options_remote(argv):
+ """"Creating a Pipeline using a PipelineOptions object for remote execution.
+
+ URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+ """
+
+ from google.cloud.dataflow import Pipeline
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ # [START pipeline_options_create]
+ options = PipelineOptions(flags=argv)
+ # [END pipeline_options_create]
+
+ # [START pipeline_options_define_custom]
+ class MyOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--input')
+ parser.add_argument('--output')
+ # [END pipeline_options_define_custom]
+
+ from google.cloud.dataflow.utils.options import GoogleCloudOptions
+ from google.cloud.dataflow.utils.options import StandardOptions
+
+ # [START pipeline_options_dataflow_service]
+ # Create and set your PipelineOptions.
+ options = PipelineOptions(flags=argv)
+
+ # For Cloud execution, set the Cloud Platform project, job_name,
+ # staging location, temp_location and specify DataflowPipelineRunner or
+ # BlockingDataflowPipelineRunner.
+ google_cloud_options = options.view_as(GoogleCloudOptions)
+ google_cloud_options.project = 'my-project-id'
+ google_cloud_options.job_name = 'myjob'
+ google_cloud_options.staging_location = 'gs://my-bucket/binaries'
+ google_cloud_options.temp_location = 'gs://my-bucket/temp'
+ options.view_as(StandardOptions).runner = 'DataflowPipelineRunner'
+
+ # Create the Pipeline with the specified options.
+ p = Pipeline(options=options)
+ # [END pipeline_options_dataflow_service]
+
+ my_options = options.view_as(MyOptions)
+ my_input = my_options.input
+ my_output = my_options.output
+
+ # Overriding the runner for tests.
+ options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
+ p = Pipeline(options=options)
+
+ lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(my_input))
+ lines | df.io.Write('WriteToText', df.io.TextFileSink(my_output))
+
+ p.run()
+
+
+def pipeline_options_local(argv):
+ """"Creating a Pipeline using a PipelineOptions object for local execution.
+
+ URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+ """
+
+ from google.cloud.dataflow import Pipeline
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ options = PipelineOptions(flags=argv)
+
+ # [START pipeline_options_define_custom_with_help_and_default]
+ class MyOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--input',
+ help='Input for the dataflow pipeline',
+ default='gs://my-bucket/input')
+ parser.add_argument('--output',
+ help='Output for the dataflow pipeline',
+ default='gs://my-bucket/output')
+ # [END pipeline_options_define_custom_with_help_and_default]
+
+ my_options = options.view_as(MyOptions)
+
+ my_input = my_options.input
+ my_output = my_options.output
+
+ # [START pipeline_options_local]
+ # Create and set your Pipeline Options.
+ options = PipelineOptions()
+ p = Pipeline(options=options)
+ # [END pipeline_options_local]
+
+ lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(my_input))
+ lines | df.io.Write('WriteToText', df.io.TextFileSink(my_output))
+ p.run()
+
+
+def pipeline_options_command_line(argv):
+ """Creating a Pipeline by passing a list of arguments.
+
+ URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
+ """
+
+ # [START pipeline_options_command_line]
+ # Use Python argparse module to parse custom arguments
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input')
+ parser.add_argument('--output')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ # Create the Pipeline with remaining arguments.
+ p = df.Pipeline(argv=pipeline_args)
+ lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(known_args.input))
+ lines | df.io.Write('WriteToText', df.io.TextFileSink(known_args.output))
+ # [END pipeline_options_command_line]
+
+ p.run()
+
+
+def pipeline_logging(lines, output):
+ """Logging Pipeline Messages.
+
+ URL: https://cloud.google.com/dataflow/pipelines/logging
+ """
+
+ import re
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ # [START pipeline_logging]
+ # import Python logging module.
+ import logging
+
+ class ExtractWordsFn(df.DoFn):
+
+ def process(self, context):
+ words = re.findall(r'[A-Za-z\']+', context.element)
+ for word in words:
+ yield word
+
+ if word.lower() == 'love':
+ # Log using the root logger at info or higher levels
+ logging.info('Found : %s', word.lower())
+
+ # Remaining WordCount example code ...
+ # [END pipeline_logging]
+
+ p = df.Pipeline(options=PipelineOptions())
+ (p
+ | df.Create(lines)
+ | df.ParDo('ExtractWords', ExtractWordsFn())
+ | df.io.Write('WriteToText', df.io.TextFileSink(output)))
+
+ p.run()
+
+
+def pipeline_monitoring(renames):
+ """Using monitoring interface snippets.
+
+ URL: https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf
+ """
+
+ import re
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ class WordCountOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--input',
+ help='Input for the dataflow pipeline',
+ default='gs://my-bucket/input')
+ parser.add_argument('--output',
+ help='output for the dataflow pipeline',
+ default='gs://my-bucket/output')
+
+ class ExtractWordsFn(df.DoFn):
+
+ def process(self, context):
+ words = re.findall(r'[A-Za-z\']+', context.element)
+ for word in words:
+ yield word
+
+ class FormatCountsFn(df.DoFn):
+
+ def process(self, context):
+ word, count = context.element
+ yield '%s: %s' % (word, count)
+
+ # [START pipeline_monitoring_composite]
+ # The CountWords Composite Transform inside the WordCount pipeline.
+ class CountWords(df.PTransform):
+
+ def apply(self, pcoll):
+ return (pcoll
+ # Convert lines of text into individual words.
+ | df.ParDo('ExtractWords', ExtractWordsFn())
+ # Count the number of times each word occurs.
+ | df.combiners.Count.PerElement()
+ # Format each word and count into a printable string.
+ | df.ParDo('FormatCounts', FormatCountsFn()))
+ # [END pipeline_monitoring_composite]
+
+ pipeline_options = PipelineOptions()
+ options = pipeline_options.view_as(WordCountOptions)
+ p = df.Pipeline(options=pipeline_options)
+
+ # [START pipeline_monitoring_execution]
+ (p
+ # Read the lines of the input text.
+ | df.io.Read('ReadLines', df.io.TextFileSource(options.input))
+ # Count the words.
+ | CountWords()
+ # Write the formatted word counts to output.
+ | df.io.Write('WriteCounts', df.io.TextFileSink(options.output)))
+ # [END pipeline_monitoring_execution]
+
+ p.visit(SnippetUtils.RenameFiles(renames))
+ p.run()
+
+
+def examples_wordcount_minimal(renames):
+ """MinimalWordCount example snippets.
+
+ URL:
+ https://cloud.google.com/dataflow/examples/wordcount-example#MinimalWordCount
+ """
+ import re
+
+ import google.cloud.dataflow as df
+
+ from google.cloud.dataflow.utils.options import GoogleCloudOptions
+ from google.cloud.dataflow.utils.options import StandardOptions
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ # [START examples_wordcount_minimal_options]
+ options = PipelineOptions()
+ google_cloud_options = options.view_as(GoogleCloudOptions)
+ google_cloud_options.project = 'my-project-id'
+ google_cloud_options.job_name = 'myjob'
+ google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
+ google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
+ options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
+ # [END examples_wordcount_minimal_options]
+
+ # Run it locally for testing.
+ options = PipelineOptions()
+
+ # [START examples_wordcount_minimal_create]
+ p = df.Pipeline(options=options)
+ # [END examples_wordcount_minimal_create]
+
+ (
+ # [START examples_wordcount_minimal_read]
+ p | df.io.Read(df.io.TextFileSource(
+ 'gs://dataflow-samples/shakespeare/kinglear.txt'))
+ # [END examples_wordcount_minimal_read]
+
+ # [START examples_wordcount_minimal_pardo]
+ | df.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+ # [END examples_wordcount_minimal_pardo]
+
+ # [START examples_wordcount_minimal_count]
+ | df.combiners.Count.PerElement()
+ # [END examples_wordcount_minimal_count]
+
+ # [START examples_wordcount_minimal_map]
+ | df.Map(lambda (word, count): '%s: %s' % (word, count))
+ # [END examples_wordcount_minimal_map]
+
+ # [START examples_wordcount_minimal_write]
+ | df.io.Write(df.io.TextFileSink('gs://my-bucket/counts.txt'))
+ # [END examples_wordcount_minimal_write]
+ )
+
+ p.visit(SnippetUtils.RenameFiles(renames))
+
+ # [START examples_wordcount_minimal_run]
+ p.run()
+ # [END examples_wordcount_minimal_run]
+
+
+def examples_wordcount_wordcount(renames):
+ """WordCount example snippets.
+
+ URL:
+ https://cloud.google.com/dataflow/examples/wordcount-example#WordCount
+ """
+ import re
+
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ argv = []
+
+ # [START examples_wordcount_wordcount_options]
+ class WordCountOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--input',
+ help='Input for the dataflow pipeline',
+ default='gs://my-bucket/input')
+
+ options = PipelineOptions(argv)
+ p = df.Pipeline(options=options)
+ # [END examples_wordcount_wordcount_options]
+
+ lines = p | df.io.Read(df.io.TextFileSource(
+ 'gs://dataflow-samples/shakespeare/kinglear.txt'))
+
+ # [START examples_wordcount_wordcount_composite]
+ class CountWords(df.PTransform):
+
+ def apply(self, pcoll):
+ return (pcoll
+ # Convert lines of text into individual words.
+ | df.FlatMap(
+ 'ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+
+ # Count the number of times each word occurs.
+ | df.combiners.Count.PerElement())
+
+ counts = lines | CountWords()
+ # [END examples_wordcount_wordcount_composite]
+
+ # [START examples_wordcount_wordcount_dofn]
+ class FormatAsTextFn(df.DoFn):
+
+ def process(self, context):
+ word, count = context.element
+ yield '%s: %s' % (word, count)
+
+ formatted = counts | df.ParDo(FormatAsTextFn())
+ # [END examples_wordcount_wordcount_dofn]
+
+ formatted | df.io.Write(df.io.TextFileSink('gs://my-bucket/counts.txt'))
+ p.visit(SnippetUtils.RenameFiles(renames))
+ p.run()
+
+
+def examples_wordcount_debugging(renames):
+ """DebuggingWordCount example snippets.
+
+ URL:
+ https://cloud.google.com/dataflow/examples/wordcount-example#DebuggingWordCount
+ """
+ import re
+
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ # [START example_wordcount_debugging_logging]
+ # [START example_wordcount_debugging_aggregators]
+ import logging
+
+ class FilterTextFn(df.DoFn):
+ """A DoFn that filters for a specific key based on a regular expression."""
+
+ # A custom aggregator can track values in your pipeline as it runs. Create
+ # custom aggregators matched_word and unmatched_words.
+ matched_words = df.Aggregator('matched_words')
+ umatched_words = df.Aggregator('umatched_words')
+
+ def __init__(self, pattern):
+ self.pattern = pattern
+
+ def process(self, context):
+ word, _ = context.element
+ if re.match(self.pattern, word):
+ # Log at INFO level each element we match. When executing this pipeline
+ # using the Dataflow service, these log lines will appear in the Cloud
+ # Logging UI.
+ logging.info('Matched %s', word)
+
+ # Add 1 to the custom aggregator matched_words
+ context.aggregate_to(self.matched_words, 1)
+ yield context.element
+ else:
+ # Log at the "DEBUG" level each element that is not matched. Different
+ # log levels can be used to control the verbosity of logging providing
+ # an effective mechanism to filter less important information. Note
+ # currently only "INFO" and higher level logs are emitted to the Cloud
+ # Logger. This log message will not be visible in the Cloud Logger.
+ logging.debug('Did not match %s', word)
+
+ # Add 1 to the custom aggregator umatched_words
+ context.aggregate_to(self.umatched_words, 1)
+ # [END example_wordcount_debugging_logging]
+ # [END example_wordcount_debugging_aggregators]
+
+ p = df.Pipeline(options=PipelineOptions())
+ filtered_words = (
+ p
+ | df.io.Read(df.io.TextFileSource(
+ 'gs://dataflow-samples/shakespeare/kinglear.txt'))
+ | df.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | df.combiners.Count.PerElement()
+ | df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+
+ # [START example_wordcount_debugging_assert]
+ df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))
+ # [END example_wordcount_debugging_assert]
+
+ output = (filtered_words
+ | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
+ | df.io.Write(
+ 'write', df.io.TextFileSink('gs://my-bucket/counts.txt')))
+
+ p.visit(SnippetUtils.RenameFiles(renames))
+ p.run()
+
+
+def model_textio(renames):
+ """Using a Read and Write transform to read/write text files.
+
+ URLs:
+ https://cloud.google.com/dataflow/model/pipeline-io
+ https://cloud.google.com/dataflow/model/text-io
+ """
+ def filter_words(x):
+ import re
+ return re.findall(r'[A-Za-z\']+', x)
+
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ # [START model_textio_read]
+ p = df.Pipeline(options=PipelineOptions())
+ # [START model_pipelineio_read]
+ lines = p | df.io.Read(
+ 'ReadFromText',
+ df.io.TextFileSource('gs://my_bucket/path/to/input-*.csv'))
+ # [END model_pipelineio_read]
+ # [END model_textio_read]
+
+ # [START model_textio_write]
+ filtered_words = lines | df.FlatMap('FilterWords', filter_words)
+ # [START model_pipelineio_write]
+ filtered_words | df.io.Write(
+ 'WriteToText', df.io.TextFileSink('gs://my_bucket/path/to/numbers',
+ file_name_suffix='.csv'))
+ # [END model_pipelineio_write]
+ # [END model_textio_write]
+
+ p.visit(SnippetUtils.RenameFiles(renames))
+ p.run()
+
+
+def model_bigqueryio():
+ """Using a Read and Write transform to read/write to BigQuery.
+
+ URL: https://cloud.google.com/dataflow/model/bigquery-io
+ """
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+
+ # [START model_bigqueryio_read]
+ p = df.Pipeline(options=PipelineOptions())
+ weather_data = p | df.io.Read(
+ 'ReadWeatherStations',
+ df.io.BigQuerySource(
+ 'clouddataflow-readonly:samples.weather_stations'))
+ # [END model_bigqueryio_read]
+
+ # [START model_bigqueryio_query]
+ p = df.Pipeline(options=PipelineOptions())
+ weather_data = p | df.io.Read(
+ 'ReadYearAndTemp',
+ df.io.BigQuerySource(
+ query='SELECT year, mean_temp FROM samples.weather_stations'))
+ # [END model_bigqueryio_query]
+
+ # [START model_bigqueryio_schema]
+ schema = 'source:STRING, quote:STRING'
+ # [END model_bigqueryio_schema]
+
+ # [START model_bigqueryio_write]
+ quotes = p | df.Create(
+ [{'source': 'Mahatma Ghandi', 'quote': 'My life is my message.'}])
+ quotes | df.io.Write(
+ 'Write', df.io.BigQuerySink(
+ 'my-project:output.output_table',
+ schema=schema,
+ write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE,
+ create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED))
+ # [END model_bigqueryio_write]
+
+
+def model_composite_transform_example(contents, output_path):
+ """Example of a composite transform.
+
+ To declare a composite transform, define a subclass of PTransform.
+
+ To override the apply method, define a method "apply" that
+ takes a PCollection as its only parameter and returns a PCollection.
+
+ URL: https://cloud.google.com/dataflow/model/composite-transforms
+ """
+ import re
+
+ import google.cloud.dataflow as df
+
+ # [START composite_transform_example]
+ # [START composite_ptransform_apply_method]
+ # [START composite_ptransform_declare]
+ class CountWords(df.PTransform):
+ # [END composite_ptransform_declare]
+
+ def apply(self, pcoll):
+ return (pcoll
+ | df.FlatMap(lambda x: re.findall(r'\w+', x))
+ | df.combiners.Count.PerElement()
+ | df.Map(lambda (word, c): '%s: %s' % (word, c)))
+ # [END composite_ptransform_apply_method]
+ # [END composite_transform_example]
+
+ from google.cloud.dataflow.utils.options import PipelineOptions
+ p = df.Pipeline(options=PipelineOptions())
+ (p
+ | df.Create(contents)
+ | CountWords()
+ | df.io.Write(df.io.TextFileSink(output_path)))
+ p.run()
+
+
+def model_multiple_pcollections_flatten(contents, output_path):
+ """Merging a PCollection with Flatten.
+
+ URL: https://cloud.google.com/dataflow/model/multiple-pcollections
+ """
+ some_hash_fn = lambda s: ord(s[0])
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+ p = df.Pipeline(options=PipelineOptions())
+ partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
+
+ # Partition into deciles
+ partitioned = p | df.Create(contents) | df.Partition(partition_fn, 3)
+ pcoll1 = partitioned[0]
+ pcoll2 = partitioned[1]
+ pcoll3 = partitioned[2]
+
+ # Flatten them back into 1
+
+ # A collection of PCollection objects can be represented simply
+ # as a tuple (or list) of PCollections.
+ # (The SDK for Python has no separate type to store multiple
+ # PCollection objects, whether containing the same or different
+ # types.)
+ # [START model_multiple_pcollections_flatten]
+ merged = (
+ # [START model_multiple_pcollections_tuple]
+ (pcoll1, pcoll2, pcoll3)
+ # [END model_multiple_pcollections_tuple]
+ # A list of tuples can be "piped" directly into a Flatten transform.
+ | df.Flatten())
+ # [END model_multiple_pcollections_flatten]
+ merged | df.io.Write(df.io.TextFileSink(output_path))
+
+ p.run()
+
+
+def model_multiple_pcollections_partition(contents, output_path):
+ """Splitting a PCollection with Partition.
+
+ URL: https://cloud.google.com/dataflow/model/multiple-pcollections
+ """
+ some_hash_fn = lambda s: ord(s[0])
+ def get_percentile(i):
+ """Assume i in [0,100)."""
+ return i
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+ p = df.Pipeline(options=PipelineOptions())
+
+ students = p | df.Create(contents)
+ # [START model_multiple_pcollections_partition]
+ def partition_fn(student, num_partitions):
+ return int(get_percentile(student) * num_partitions / 100)
+
+ by_decile = students | df.Partition(partition_fn, 10)
+ # [END model_multiple_pcollections_partition]
+ # [START model_multiple_pcollections_partition_40th]
+ fortieth_percentile = by_decile[4]
+ # [END model_multiple_pcollections_partition_40th]
+
+ ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
+ | df.Flatten()
+ | df.io.Write(df.io.TextFileSink(output_path)))
+
+ p.run()
+
+
+def model_group_by_key(contents, output_path):
+ """Applying a GroupByKey Transform.
+
+ URL: https://cloud.google.com/dataflow/model/group-by-key
+ """
+ import re
+
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+ p = df.Pipeline(options=PipelineOptions())
+ words_and_counts = (
+ p
+ | df.Create(contents)
+ | df.FlatMap(lambda x: re.findall(r'\w+', x))
+ | df.Map('one word', lambda w: (w, 1)))
+ # GroupByKey accepts a PCollection of (w, 1) and
+ # outputs a PCollection of (w, (1, 1, ...)).
+ # (A key/value pair is just a tuple in Python.)
+ # This is a somewhat forced example, since one could
+ # simply use df.combiners.Count.PerElement here.
+ # [START model_group_by_key_transform]
+ grouped_words = words_and_counts | df.GroupByKey()
+ # [END model_group_by_key_transform]
+ (grouped_words
+ | df.Map('count words', lambda (word, counts): (word, len(counts)))
+ | df.io.Write(df.io.TextFileSink(output_path)))
+ p.run()
+
+
+def model_co_group_by_key_tuple(email_list, phone_list, output_path):
+ """Applying a CoGroupByKey Transform to a tuple.
+
+ URL: https://cloud.google.com/dataflow/model/group-by-key
+ """
+ import google.cloud.dataflow as df
+ from google.cloud.dataflow.utils.options import PipelineOptions
+ p = df.Pipeline(options=PipelineOptions())
+ # [START model_group_by_key_cogroupbykey_tuple]
+ # Each data set is represented by key-value pairs in separate PCollections.
+ # Both data sets share a common key type (in this example str).
+ # The email_list contains values such as: ('joe', 'joe@example.com') with
+ # multiple possible values for each key.
+ # The phone_list contains values such as: ('mary': '111-222-3333') with
+ # multiple possible values for each key.
+ emails = p | df.Create('email', email_list)
+ phones = p | df.Create('phone', phone_list)
+ # The result PCollection contains one key-value element for each key in the
+ # input PCollections. The key of the pair will be the key from the input and
+ # the value will be a dictionary with two entries: 'emails' - an iterable of
+ # all values for the current key in the emails PCollection and 'phones': an
+ # iterable of all values for the current key in the phones PCollection.
+ # For instance, if 'emails' contained ('joe', 'joe@example.com') and
+ # ('joe', 'joe@gmail.com'), then 'result' will contain the element
+ # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...})
+ result = {'emails': emails, 'phones': phones} | df.CoGroupByKey()
+
+ def join_info((name, info)):
+ return '; '.join(['%s' % name,
+ '%s' % ','.join(info['emails']),
+ '%s' % ','.join(info['phones'])])
+
+ contact_lines = result | df.Map(join_info)
+ # [END model_group_by_key_cogroupbykey_tuple]
+ contact_lines | df.io.Write(df.io.TextFileSink(output_path))
+ p.run()
+
+
+# [START model_library_transforms_keys]
+class Keys(df.PTransform):
+
+ def apply(self, pcoll):
+ return pcoll | df.Map('Keys', lambda (k, v): k)
+# [END model_library_transforms_keys]
+# pylint: enable=invalid-name
+
+
+# [START model_library_transforms_count]
+class Count(df.PTransform):
+
+ def apply(self, pcoll):
+ return (
+ pcoll
+ | df.Map('Init', lambda v: (v, 1))
+ | df.CombinePerKey(sum))
+# [END model_library_transforms_count]
+# pylint: enable=g-wrong-blank-lines
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
new file mode 100644
index 0000000..4c2014f
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -0,0 +1,560 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for all code snippets used in public docs."""
+
+import logging
+import sys
+import tempfile
+import unittest
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow import io
+from google.cloud.dataflow import pvalue
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.examples.snippets import snippets
+from google.cloud.dataflow.io import fileio
+from google.cloud.dataflow.utils.options import TypeOptions
+
+
+# Monky-patch to use native sink for file path re-writing.
+io.TextFileSink = fileio.NativeTextFileSink
+
+
+class ParDoTest(unittest.TestCase):
+ """Tests for dataflow/model/par-do."""
+
+ def test_pardo(self):
+ # Note: "words" and "ComputeWordLengthFn" are referenced by name in
+ # the text of the doc.
+
+ words = ['aa', 'bbb', 'c']
+ # [START model_pardo_pardo]
+ class ComputeWordLengthFn(df.DoFn):
+ def process(self, context):
+ return [len(context.element)]
+ # [END model_pardo_pardo]
+
+ # [START model_pardo_apply]
+ # Apply a ParDo to the PCollection "words" to compute lengths for each word.
+ word_lengths = words | df.ParDo(ComputeWordLengthFn())
+ # [END model_pardo_apply]
+ self.assertEqual({2, 3, 1}, set(word_lengths))
+
+ def test_pardo_yield(self):
+ words = ['aa', 'bbb', 'c']
+ # [START model_pardo_yield]
+ class ComputeWordLengthFn(df.DoFn):
+ def process(self, context):
+ yield len(context.element)
+ # [END model_pardo_yield]
+
+ word_lengths = words | df.ParDo(ComputeWordLengthFn())
+ self.assertEqual({2, 3, 1}, set(word_lengths))
+
+ def test_pardo_using_map(self):
+ words = ['aa', 'bbb', 'c']
+ # [START model_pardo_using_map]
+ word_lengths = words | df.Map(len)
+ # [END model_pardo_using_map]
+
+ self.assertEqual({2, 3, 1}, set(word_lengths))
+
+ def test_pardo_using_flatmap(self):
+ words = ['aa', 'bbb', 'c']
+ # [START model_pardo_using_flatmap]
+ word_lengths = words | df.FlatMap(lambda word: [len(word)])
+ # [END model_pardo_using_flatmap]
+
+ self.assertEqual({2, 3, 1}, set(word_lengths))
+
+ def test_pardo_using_flatmap_yield(self):
+ words = ['aA', 'bbb', 'C']
+ # [START model_pardo_using_flatmap_yield]
+ def capitals(word):
+ for letter in word:
+ if 'A' <= letter <= 'Z':
+ yield letter
+ all_capitals = words | df.FlatMap(capitals)
+ # [END model_pardo_using_flatmap_yield]
+
+ self.assertEqual({'A', 'C'}, set(all_capitals))
+
+ def test_pardo_with_label(self):
+ words = ['aa', 'bbc', 'defg']
+ # [START model_pardo_with_label]
+ result = words | df.Map('CountUniqueLetters', lambda word: len(set(word)))
+ # [END model_pardo_with_label]
+
+ self.assertEqual({1, 2, 4}, set(result))
+
+ def test_pardo_side_input(self):
+ p = df.Pipeline('DirectPipelineRunner')
+ words = p | df.Create('start', ['a', 'bb', 'ccc', 'dddd'])
+
+ # [START model_pardo_side_input]
+ # Callable takes additional arguments.
+ def filter_using_length(word, lower_bound, upper_bound=float('inf')):
+ if lower_bound <= len(word) <= upper_bound:
+ yield word
+
+ # Construct a deferred side input.
+ avg_word_len = words | df.Map(len) | df.CombineGlobally(df.combiners.MeanCombineFn())
+
+ # Call with explicit side inputs.
+ small_words = words | df.FlatMap('small', filter_using_length, 0, 3)
+
+ # A single deferred side input.
+ larger_than_average = words | df.FlatMap('large',
+ filter_using_length,
+ lower_bound=pvalue.AsSingleton(avg_word_len))
+
+ # Mix and match.
+ small_but_nontrivial = words | df.FlatMap(filter_using_length,
+ lower_bound=2,
+ upper_bound=pvalue.AsSingleton(avg_word_len))
+ # [END model_pardo_side_input]
+
+ df.assert_that(small_words, df.equal_to(['a', 'bb', 'ccc']))
+ df.assert_that(larger_than_average, df.equal_to(['ccc', 'dddd']),
+ label='larger_than_average')
+ df.assert_that(small_but_nontrivial, df.equal_to(['bb']),
+ label='small_but_not_trivial')
+ p.run()
+
+ def test_pardo_side_input_dofn(self):
+ words = ['a', 'bb', 'ccc', 'dddd']
+
+ # [START model_pardo_side_input_dofn]
+ class FilterUsingLength(df.DoFn):
+ def process(self, context, lower_bound, upper_bound=float('inf')):
+ if lower_bound <= len(context.element) <= upper_bound:
+ yield context.element
+
+ small_words = words | df.ParDo(FilterUsingLength(), 0, 3)
+ # [END model_pardo_side_input_dofn]
+ self.assertEqual({'a', 'bb', 'ccc'}, set(small_words))
+
+ def test_pardo_with_side_outputs(self):
+ # [START model_pardo_emitting_values_on_side_outputs]
+ class ProcessWords(df.DoFn):
+
+ def process(self, context, cutoff_length, marker):
+ if len(context.element) <= cutoff_length:
+ # Emit this short word to the main output.
+ yield context.element
+ else:
+ # Emit this word's long length to a side output.
+ yield pvalue.SideOutputValue(
+ 'above_cutoff_lengths', len(context.element))
+ if context.element.startswith(marker):
+ # Emit this word to a different side output.
+ yield pvalue.SideOutputValue('marked strings', context.element)
+ # [END model_pardo_emitting_values_on_side_outputs]
+
+ words = ['a', 'an', 'the', 'music', 'xyz']
+
+ # [START model_pardo_with_side_outputs]
+ results = (words | df.ParDo(ProcessWords(), cutoff_length=2, marker='x')
+ .with_outputs('above_cutoff_lengths',
+ 'marked strings',
+ main='below_cutoff_strings'))
+ below = results.below_cutoff_strings
+ above = results.above_cutoff_lengths
+ marked = results['marked strings'] # indexing works as well
+ # [END model_pardo_with_side_outputs]
+
+ self.assertEqual({'a', 'an'}, set(below))
+ self.assertEqual({3, 5}, set(above))
+ self.assertEqual({'xyz'}, set(marked))
+
+ # [START model_pardo_with_side_outputs_iter]
+ below, above, marked = (words | df.ParDo(ProcessWords(), cutoff_length=2, marker='x')
+ .with_outputs('above_cutoff_lengths',
+ 'marked strings',
+ main='below_cutoff_strings'))
+ # [END model_pardo_with_side_outputs_iter]
+
+ self.assertEqual({'a', 'an'}, set(below))
+ self.assertEqual({3, 5}, set(above))
+ self.assertEqual({'xyz'}, set(marked))
+
+ def test_pardo_with_undeclared_side_outputs(self):
+ numbers = [1, 2, 3, 4, 5, 10, 20]
+ # [START model_pardo_with_side_outputs_undeclared]
+ def even_odd(x):
+ yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
+ if x % 10 == 0:
+ yield x
+
+ results = numbers | df.FlatMap(even_odd).with_outputs()
+
+ evens = results.even
+ odds = results.odd
+ tens = results[None] # the undeclared main output
+ # [END model_pardo_with_side_outputs_undeclared]
+
+ self.assertEqual({2, 4, 10, 20}, set(evens))
+ self.assertEqual({1, 3, 5}, set(odds))
+ self.assertEqual({10, 20}, set(tens))
+
+
+class TypeHintsTest(unittest.TestCase):
+
+ def test_bad_types(self):
+ p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
+
+ # [START type_hints_missing_define_numbers]
+ numbers = p | df.Create(['1', '2', '3'])
+ # [END type_hints_missing_define_numbers]
+
+ # Consider the following code.
+ # [START type_hints_missing_apply]
+ evens = numbers | df.Filter(lambda x: x % 2 == 0)
+ # [END type_hints_missing_apply]
+
+ # Now suppose numers was defined as [snippet above].
+ # When running this pipeline, you'd get a runtime error,
+ # possibly on a remote machine, possibly very late.
+
+ with self.assertRaises(TypeError):
+ p.run()
+
+ # To catch this early, we can assert what types we expect.
+ with self.assertRaises(typehints.TypeCheckError):
+ # [START type_hints_takes]
+ p.options.view_as(TypeOptions).pipeline_type_check = True
+ evens = numbers | df.Filter(lambda x: x % 2 == 0).with_input_types(int)
+ # [END type_hints_takes]
+
+ # Type hints can be declared on DoFns and callables as well, rather
+ # than where they're used, to be more self contained.
+ with self.assertRaises(typehints.TypeCheckError):
+ # [START type_hints_do_fn]
+ @df.typehints.with_input_types(int)
+ class FilterEvensDoFn(df.DoFn):
+ def process(self, context):
+ if context.element % 2 == 0:
+ yield context.element
+ evens = numbers | df.ParDo(FilterEvensDoFn())
+ # [END type_hints_do_fn]
+
+ words = p | df.Create('words', ['a', 'bb', 'c'])
+ # One can assert outputs and apply them to transforms as well.
+ # Helps document the contract and checks it at pipeline construction time.
+ # [START type_hints_transform]
+ T = df.typehints.TypeVariable('T')
+ @df.typehints.with_input_types(T)
+ @df.typehints.with_output_types(df.typehints.Tuple[int, T])
+ class MyTransform(df.PTransform):
+ def apply(self, pcoll):
+ return pcoll | df.Map(lambda x: (len(x), x))
+
+ words_with_lens = words | MyTransform()
+ # [END type_hints_transform]
+
+ with self.assertRaises(typehints.TypeCheckError):
+ words_with_lens | df.Map(lambda x: x).with_input_types(
+ df.typehints.Tuple[int, int])
+
+ def test_runtime_checks_off(self):
+ p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
+ # [START type_hints_runtime_off]
+ p | df.Create(['a']) | df.Map(lambda x: 3).with_output_types(str)
+ p.run()
+ # [END type_hints_runtime_off]
+
+ def test_runtime_checks_on(self):
+ p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
+ with self.assertRaises(typehints.TypeCheckError):
+ # [START type_hints_runtime_on]
+ p.options.view_as(TypeOptions).runtime_type_check = True
+ p | df.Create(['a']) | df.Map(lambda x: 3).with_output_types(str)
+ p.run()
+ # [END type_hints_runtime_on]
+
+ def test_deterministic_key(self):
+ p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
+ lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']
+
+ # [START type_hints_deterministic_key]
+ class Player(object):
+ def __init__(self, team, name):
+ self.team = team
+ self.name = name
+
+ class PlayerCoder(df.coders.Coder):
+ def encode(self, player):
+ return '%s:%s' % (player.team, player.name)
+
+ def decode(self, s):
+ return Player(*s.split(':'))
+
+ def is_deterministic(self):
+ return True
+
+ df.coders.registry.register_coder(Player, PlayerCoder)
+
+ def parse_player_and_score(csv):
+ name, team, score = csv.split(',')
+ return Player(team, name), int(score)
+
+ totals = (
+ lines
+ | df.Map(parse_player_and_score)
+ | df.CombinePerKey(sum).with_input_types(df.typehints.Tuple[Player, int]))
+ # [END type_hints_deterministic_key]
+
+ self.assertEquals(
+ {('banana', 3), ('kiwi', 4), ('zucchini', 3)},
+ set(totals | df.Map(lambda (k, v): (k.name, v))))
+
+
+class SnippetsTest(unittest.TestCase):
+
+ def create_temp_file(self, contents=''):
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f.write(contents)
+ return f.name
+
+ def get_output(self, path, sorted_output=True, suffix=''):
+ with open(path + '-00000-of-00001' + suffix) as f:
+ lines = f.readlines()
+ if sorted_output:
+ return sorted(s.rstrip('\n') for s in lines)
+ else:
+ return [s.rstrip('\n') for s in lines]
+
+ def test_model_pipelines(self):
+ temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
+ result_path = temp_path + '.result'
+ snippets.model_pipelines([
+ '--input=%s*' % temp_path,
+ '--output=%s' % result_path])
+ self.assertEqual(
+ self.get_output(result_path),
+ [str(s) for s in [(u'aa', 1), (u'bb', 2), (u'cc', 3)]])
+
+ def test_model_pcollection(self):
+ temp_path = self.create_temp_file()
+ snippets.model_pcollection(['--output=%s' % temp_path])
+ self.assertEqual(self.get_output(temp_path, sorted_output=False), [
+ 'To be, or not to be: that is the question: ',
+ 'Whether \'tis nobler in the mind to suffer ',
+ 'The slings and arrows of outrageous fortune, ',
+ 'Or to take arms against a sea of troubles, '])
+
+ def test_construct_pipeline(self):
+ temp_path = self.create_temp_file(
+ 'abc def ghi\n jkl mno pqr\n stu vwx yz')
+ result_path = self.create_temp_file()
+ snippets.construct_pipeline({'read': temp_path, 'write': result_path})
+ self.assertEqual(
+ self.get_output(result_path),
+ ['cba', 'fed', 'ihg', 'lkj', 'onm', 'rqp', 'uts', 'xwv', 'zy'])
+
+ def test_model_textio(self):
+ temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
+ result_path = temp_path + '.result'
+ snippets.model_textio({'read': temp_path, 'write': result_path})
+ self.assertEqual(
+ ['aa', 'bb', 'bb', 'cc', 'cc', 'cc'],
+ self.get_output(result_path, suffix='.csv'))
+
+ def test_model_bigqueryio(self):
+ # We cannot test BigQueryIO functionality in unit tests therefore we limit
+ # ourselves to making sure the pipeline containing BigQuery sources and
+ # sinks can be built.
+ self.assertEqual(None, snippets.model_bigqueryio())
+
+ def _run_test_pipeline_for_options(self, fn):
+ temp_path = self.create_temp_file('aa\nbb\ncc')
+ result_path = temp_path + '.result'
+ fn([
+ '--input=%s*' % temp_path,
+ '--output=%s' % result_path])
+ self.assertEqual(
+ ['aa', 'bb', 'cc'],
+ self.get_output(result_path))
+
+ def test_pipeline_options_local(self):
+ self._run_test_pipeline_for_options(snippets.pipeline_options_local)
+
+ def test_pipeline_options_remote(self):
+ self._run_test_pipeline_for_options(snippets.pipeline_options_remote)
+
+ def test_pipeline_options_command_line(self):
+ self._run_test_pipeline_for_options(snippets.pipeline_options_command_line)
+
+ def test_pipeline_logging(self):
+ result_path = self.create_temp_file()
+ lines = ['we found love right where we are',
+ 'we found love right from the start',
+ 'we found love in a hopeless place']
+ snippets.pipeline_logging(lines, result_path)
+ self.assertEqual(
+ sorted(' '.join(lines).split(' ')),
+ self.get_output(result_path))
+
+ def test_examples_wordcount(self):
+ pipelines = [snippets.examples_wordcount_minimal,
+ snippets.examples_wordcount_wordcount,
+ snippets.pipeline_monitoring]
+
+ for pipeline in pipelines:
+ temp_path = self.create_temp_file(
+ 'abc def ghi\n abc jkl')
+ result_path = self.create_temp_file()
+ pipeline({'read': temp_path, 'write': result_path})
+ self.assertEqual(
+ self.get_output(result_path),
+ ['abc: 2', 'def: 1', 'ghi: 1', 'jkl: 1'])
+
+ def test_examples_wordcount_debugging(self):
+ temp_path = self.create_temp_file(
+ 'Flourish Flourish Flourish stomach abc def')
+ result_path = self.create_temp_file()
+ snippets.examples_wordcount_debugging(
+ {'read': temp_path, 'write': result_path})
+ self.assertEqual(
+ self.get_output(result_path),
+ ['Flourish: 3', 'stomach: 1'])
+
+ def test_model_composite_transform_example(self):
+ contents = ['aa bb cc', 'bb cc', 'cc']
+ result_path = self.create_temp_file()
+ snippets.model_composite_transform_example(contents, result_path)
+ self.assertEqual(['aa: 1', 'bb: 2', 'cc: 3'], self.get_output(result_path))
+
+ def test_model_multiple_pcollections_flatten(self):
+ contents = ['a', 'b', 'c', 'd', 'e', 'f']
+ result_path = self.create_temp_file()
+ snippets.model_multiple_pcollections_flatten(contents, result_path)
+ self.assertEqual(contents, self.get_output(result_path))
+
+ def test_model_multiple_pcollections_partition(self):
+ contents = [17, 42, 64, 32, 0, 99, 53, 89]
+ result_path = self.create_temp_file()
+ snippets.model_multiple_pcollections_partition(contents, result_path)
+ self.assertEqual(['0', '17', '32', '42', '53', '64', '89', '99'],
+ self.get_output(result_path))
+
+ def test_model_group_by_key(self):
+ contents = ['a bb ccc bb bb a']
+ result_path = self.create_temp_file()
+ snippets.model_group_by_key(contents, result_path)
+ expected = [('a', 2), ('bb', 3), ('ccc', 1)]
+ self.assertEqual([str(s) for s in expected], self.get_output(result_path))
+
+ def test_model_co_group_by_key_tuple(self):
+ email_list = [['a', 'a@example.com'], ['b', 'b@example.com']]
+ phone_list = [['a', 'x4312'], ['b', 'x8452']]
+ result_path = self.create_temp_file()
+ snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path)
+ expect = ['a; a@example.com; x4312', 'b; b@example.com; x8452']
+ self.assertEqual(expect, self.get_output(result_path))
+
+
+class CombineTest(unittest.TestCase):
+ """Tests for dataflow/model/combine."""
+
+ def test_global_sum(self):
+ pc = [1, 2, 3]
+ # [START global_sum]
+ result = pc | df.CombineGlobally(sum)
+ # [END global_sum]
+ self.assertEqual([6], result)
+
+ def test_combine_values(self):
+ occurences = [('cat', 1), ('cat', 5), ('cat', 9), ('dog', 5), ('dog', 2)]
+ # [START combine_values]
+ first_occurences = occurences | df.GroupByKey() | df.CombineValues(min)
+ # [END combine_values]
+ self.assertEqual({('cat', 1), ('dog', 2)}, set(first_occurences))
+
+ def test_combine_per_key(self):
+ player_accuracies = [
+ ('cat', 1), ('cat', 5), ('cat', 9), ('cat', 1),
+ ('dog', 5), ('dog', 2)]
+ # [START combine_per_key]
+ avg_accuracy_per_player = player_accuracies | df.CombinePerKey(df.combiners.MeanCombineFn())
+ # [END combine_per_key]
+ self.assertEqual({('cat', 4.0), ('dog', 3.5)}, set(avg_accuracy_per_player))
+
+ def test_combine_concat(self):
+ pc = ['a', 'b']
+ # [START combine_concat]
+ def concat(values, separator=', '):
+ return separator.join(values)
+ with_commas = pc | df.CombineGlobally(concat)
+ with_dashes = pc | df.CombineGlobally(concat, separator='-')
+ # [END combine_concat]
+ self.assertEqual(1, len(with_commas))
+ self.assertTrue(with_commas[0] in {'a, b', 'b, a'})
+ self.assertEqual(1, len(with_dashes))
+ self.assertTrue(with_dashes[0] in {'a-b', 'b-a'})
+
+ def test_bounded_sum(self):
+ # [START combine_bounded_sum]
+ pc = [1, 10, 100, 1000]
+ def bounded_sum(values, bound=500):
+ return min(sum(values), bound)
+ small_sum = pc | df.CombineGlobally(bounded_sum) # [500]
+ large_sum = pc | df.CombineGlobally(bounded_sum, bound=5000) # [1111]
+ # [END combine_bounded_sum]
+ self.assertEqual([500], small_sum)
+ self.assertEqual([1111], large_sum)
+
+ def test_combine_reduce(self):
+ factors = [2, 3, 5, 7]
+ # [START combine_reduce]
+ import functools
+ import operator
+ product = factors | df.CombineGlobally(functools.partial(reduce, operator.mul), 1)
+ # [END combine_reduce]
+ self.assertEqual([210], product)
+
+ def test_custom_average(self):
+ pc = [2, 3, 5, 7]
+
+
+ # [START combine_custom_average]
+ class AverageFn(df.CombineFn):
+ def create_accumulator(self):
+ return (0.0, 0)
+ def add_input(self, (sum, count), input):
+ return sum + input, count + 1
+ def merge_accumulators(self, accumulators):
+ sums, counts = zip(*accumulators)
+ return sum(sums), sum(counts)
+ def extract_output(self, (sum, count)):
+ return sum / count if count else float('NaN')
+ average = pc | df.CombineGlobally(AverageFn())
+ # [END combine_custom_average]
+ self.assertEqual([4.25], average)
+
+ def test_keys(self):
+ occurrences = [('cat', 1), ('cat', 5), ('dog', 5), ('cat', 9), ('dog', 2)]
+ unique_keys = occurrences | snippets.Keys()
+ self.assertEqual({'cat', 'dog'}, set(unique_keys))
+
+ def test_count(self):
+ occurrences = ['cat', 'dog', 'cat', 'cat', 'dog']
+ perkey_counts = occurrences | snippets.Count()
+ self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
new file mode 100644
index 0000000..67efb96
--- /dev/null
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -0,0 +1,61 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A streaming string-capitalization workflow.
+
+Important: streaming pipeline support in Python Dataflow is in development
+and is not yet available for use.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None):
+ """Build and run the pipeline."""
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input_topic', dest='input_topic', required=True,
+ help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+ parser.add_argument(
+ '--output_topic', dest='output_topic', required=True,
+ help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+
+ # Read the text file[pattern] into a PCollection.
+ lines = p | df.io.Read(
+ 'read', df.io.PubSubSource(known_args.input_topic))
+
+ # Capitalize the characters in each line.
+ transformed = (lines
+ | (df.Map('capitalize', lambda x: x.upper())))
+
+ # Write to PubSub.
+ # pylint: disable=expression-not-assigned
+ transformed | df.io.Write(
+ 'pubsub_write', df.io.PubSubSink(known_args.output_topic))
+
+ p.run()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
new file mode 100644
index 0000000..210d301
--- /dev/null
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -0,0 +1,71 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A streaming word-counting workflow.
+
+Important: streaming pipeline support in Python Dataflow is in development
+and is not yet available for use.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+
+import google.cloud.dataflow as df
+import google.cloud.dataflow.transforms.window as window
+
+
+def run(argv=None):
+ """Build and run the pipeline."""
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input_topic', required=True,
+ help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+ parser.add_argument(
+ '--output_topic', required=True,
+ help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+
+ # Read the text file[pattern] into a PCollection.
+ lines = p | df.io.Read(
+ 'read', df.io.PubSubSource(known_args.input_topic))
+
+ # Capitalize the characters in each line.
+ transformed = (lines
+ | (df.FlatMap('split',
+ lambda x: re.findall(r'[A-Za-z\']+', x))
+ .with_output_types(unicode))
+ | df.Map('pair_with_one', lambda x: (x, 1))
+ | df.WindowInto(window.FixedWindows(15, 0))
+ | df.GroupByKey('group')
+ | df.Map('count', lambda (word, ones): (word, sum(ones)))
+ | df.Map('format', lambda tup: '%s: %d' % tup))
+
+ # Write to PubSub.
+ # pylint: disable=expression-not-assigned
+ transformed | df.io.Write(
+ 'pubsub_write', df.io.PubSubSink(known_args.output_topic))
+
+ p.run()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
new file mode 100644
index 0000000..cf87268
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -0,0 +1,99 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A word-counting workflow."""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+empty_line_aggregator = df.Aggregator('emptyLines')
+average_word_size_aggregator = df.Aggregator('averageWordLength',
+ df.combiners.MeanCombineFn(),
+ float)
+
+
+class WordExtractingDoFn(df.DoFn):
+ """Parse each line of input text into words."""
+
+ def process(self, context):
+ """Returns an iterator over the words of this element.
+
+ The element is a line of text. If the line is blank, note that, too.
+
+ Args:
+ context: the call-specific context: data and aggregator.
+
+ Returns:
+ The processed element.
+ """
+ text_line = context.element.strip()
+ if not text_line:
+ context.aggregate_to(empty_line_aggregator, 1)
+ words = re.findall(r'[A-Za-z\']+', text_line)
+ for w in words:
+ context.aggregate_to(average_word_size_aggregator, len(w))
+ return words
+
+
+def run(argv=None):
+ """Main entry point; defines and runs the wordcount pipeline."""
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
+ parser.add_argument('--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+
+ # Read the text file[pattern] into a PCollection.
+ lines = p | df.io.Read('read', df.io.TextFileSource(known_args.input))
+
+ # Count the occurrences of each word.
+ counts = (lines
+ | (df.ParDo('split', WordExtractingDoFn())
+ .with_output_types(unicode))
+ | df.Map('pair_with_one', lambda x: (x, 1))
+ | df.GroupByKey('group')
+ | df.Map('count', lambda (word, ones): (word, sum(ones))))
+
+ # Format the counts into a PCollection of strings.
+ output = counts | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
+
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ output | df.io.Write('write', df.io.TextFileSink(known_args.output))
+
+ # Actually run the pipeline (all operations above are deferred).
+ result = p.run()
+ empty_line_values = result.aggregated_values(empty_line_aggregator)
+ logging.info('number of empty lines: %d', sum(empty_line_values.values()))
+ word_length_values = result.aggregated_values(average_word_size_aggregator)
+ logging.info('average word lengths: %s', word_length_values.values())
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
new file mode 100644
index 0000000..66d4eb1
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -0,0 +1,154 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""An example that verifies the counts and includes Dataflow best practices.
+
+On top of the basic concepts in the wordcount example, this workflow introduces
+logging to Cloud Logging, and using assertions in a Dataflow pipeline.
+
+To execute this pipeline locally, specify a local output file or output prefix
+on GCS::
+
+ --output [YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+
+To execute this pipeline using the Google Cloud Dataflow service, specify
+pipeline configuration::
+
+ --project YOUR_PROJECT_ID
+ --staging_location gs://YOUR_STAGING_DIRECTORY
+ --temp_location gs://YOUR_TEMP_DIRECTORY
+ --job_name YOUR_JOB_NAME
+ --runner BlockingDataflowPipelineRunner
+
+and an output prefix on GCS::
+
+ --output gs://YOUR_OUTPUT_PREFIX
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+class FilterTextFn(df.DoFn):
+ """A DoFn that filters for a specific key based on a regular expression."""
+
+ # A custom aggregator can track values in your pipeline as it runs. Those
+ # values will be displayed in the Dataflow Monitoring UI when this pipeline is
+ # run using the Dataflow service. These aggregators below track the number of
+ # matched and unmatched words. Learn more at
+ # https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
+ # the Dataflow Monitoring UI.
+ matched_words = df.Aggregator('matched_words')
+ umatched_words = df.Aggregator('umatched_words')
+
+ def __init__(self, pattern):
+ super(FilterTextFn, self).__init__()
+ self.pattern = pattern
+
+ def process(self, context):
+ word, _ = context.element
+ if re.match(self.pattern, word):
+ # Log at INFO level each element we match. When executing this pipeline
+ # using the Dataflow service, these log lines will appear in the Cloud
+ # Logging UI.
+ logging.info('Matched %s', word)
+ context.aggregate_to(self.matched_words, 1)
+ yield context.element
+ else:
+ # Log at the "DEBUG" level each element that is not matched. Different log
+ # levels can be used to control the verbosity of logging providing an
+ # effective mechanism to filter less important information.
+ # Note currently only "INFO" and higher level logs are emitted to the
+ # Cloud Logger. This log message will not be visible in the Cloud Logger.
+ logging.debug('Did not match %s', word)
+ context.aggregate_to(self.umatched_words, 1)
+
+
+class CountWords(df.PTransform):
+ """A transform to count the occurrences of each word.
+
+ A PTransform that converts a PCollection containing lines of text into a
+ PCollection of (word, count) tuples.
+ """
+
+ def __init__(self):
+ super(CountWords, self).__init__()
+
+ def apply(self, pcoll):
+ return (pcoll
+ | (df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+ .with_output_types(unicode))
+ | df.Map('pair_with_one', lambda x: (x, 1))
+ | df.GroupByKey('group')
+ | df.Map('count', lambda (word, ones): (word, sum(ones))))
+
+
+def run(argv=None):
+ """Runs the debugging wordcount pipeline."""
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
+ parser.add_argument('--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+
+ # Read the text file[pattern] into a PCollection, count the occurrences of
+ # each word and filter by a list of words.
+ filtered_words = (
+ p | df.io.Read('read', df.io.TextFileSource(known_args.input))
+ | CountWords() | df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+
+ # assert_that is a convenient PTransform that checks a PCollection has an
+ # expected value. Asserts are best used in unit tests with small data sets but
+ # is demonstrated here as a teaching tool.
+ #
+ # Note assert_that does not provide any output and that successful completion
+ # of the Pipeline implies that the expectations were met. Learn more at
+ # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
+ # test your pipeline.
+ df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))
+
+ # Format the counts into a PCollection of strings and write the output using a
+ # "Write" transform that has side effects.
+ # pylint: disable=unused-variable
+ output = (filtered_words
+ | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
+ | df.io.Write('write', df.io.TextFileSink(known_args.output)))
+
+ # Actually run the pipeline (all operations above are deferred).
+ p.run()
+
+
+if __name__ == '__main__':
+ # Cloud Logging would contain only logging.INFO and higher level logs logged
+ # by the root logger. All log statements emitted by the root logger will be
+ # visible in the Cloud Logging UI. Learn more at
+ # https://cloud.google.com/logging about the Cloud Logging UI.
+ #
+ # You can set the default logging level to a different level when running
+ # locally.
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_debugging_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging_test.py b/sdks/python/apache_beam/examples/wordcount_debugging_test.py
new file mode 100644
index 0000000..aa517d6
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_debugging_test.py
@@ -0,0 +1,56 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the debugging wordcount example."""
+
+import logging
+import re
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples import wordcount_debugging
+
+
+class WordCountTest(unittest.TestCase):
+
+ SAMPLE_TEXT = 'xx yy Flourish\n zz Flourish Flourish stomach\n aa\n bb cc dd'
+
+ def create_temp_file(self, contents):
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f.write(contents)
+ return f.name
+
+ def get_results(self, temp_path):
+ results = []
+ with open(temp_path + '.result-00000-of-00001') as result_file:
+ for line in result_file:
+ match = re.search(r'([A-Za-z]+): ([0-9]+)', line)
+ if match is not None:
+ results.append((match.group(1), int(match.group(2))))
+ return results
+
+ def test_basics(self):
+ temp_path = self.create_temp_file(self.SAMPLE_TEXT)
+ expected_words = [('Flourish', 3), ('stomach', 1)]
+ wordcount_debugging.run([
+ '--input=%s*' % temp_path,
+ '--output=%s.result' % temp_path])
+
+ # Parse result file and compare.
+ results = self.get_results(temp_path)
+ self.assertEqual(sorted(results), sorted(expected_words))
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
new file mode 100644
index 0000000..69f3986
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -0,0 +1,111 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A minimalist word-counting workflow that counts words in Shakespeare.
+
+This is the first in a series of successively more detailed 'word count'
+examples.
+
+Next, see the wordcount pipeline, then the wordcount_debugging pipeline, for
+more detailed examples that introduce additional concepts.
+
+Concepts:
+
+1. Reading data from text files
+2. Specifying 'inline' transforms
+3. Counting a PCollection
+4. Writing data to Cloud Storage as text files
+
+To execute this pipeline locally, first edit the code to specify the output
+location. Output location could be a local file path or an output prefix
+on GCS. (Only update the output location marked with the first CHANGE comment.)
+
+To execute this pipeline remotely, first edit the code to set your project ID,
+runner type, the staging location, the temp location, and the output location.
+The specified GCS bucket(s) must already exist. (Update all the places marked
+with a CHANGE comment.)
+
+Then, run the pipeline as described in the README. It will be deployed and run
+using the Google Cloud Dataflow Service. No args are required to run the
+pipeline. You can see the results in your output bucket in the GCS browser.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None):
+ """Main entry point; defines and runs the wordcount pipeline."""
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
+ parser.add_argument('--output',
+ dest='output',
+ # CHANGE 1/5: The Google Cloud Storage path is required
+ # for outputting the results.
+ default='gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX',
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ pipeline_args.extend([
+ # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowPipelineRunner to
+ # run your pipeline on the Google Cloud Dataflow Service.
+ '--runner=DirectPipelineRunner',
+ # CHANGE 3/5: Your project ID is required in order to run your pipeline on
+ # the Google Cloud Dataflow Service.
+ '--project=SET_YOUR_PROJECT_ID_HERE',
+ # CHANGE 4/5: Your Google Cloud Storage path is required for staging local
+ # files.
+ '--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY',
+ # CHANGE 5/5: Your Google Cloud Storage path is required for temporary
+ # files.
+ '--temp_location=gs://YOUR_BUCKET_NAME/AND_TEMP_DIRECTORY',
+ '--job_name=your-wordcount-job',
+ ])
+
+ p = df.Pipeline(argv=pipeline_args)
+
+ # Read the text file[pattern] into a PCollection.
+ lines = p | df.io.Read('read', df.io.TextFileSource(known_args.input))
+
+ # Count the occurrences of each word.
+ counts = (lines
+ | (df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+ .with_output_types(unicode))
+ | df.Map('pair_with_one', lambda x: (x, 1))
+ | df.GroupByKey('group')
+ | df.Map('count', lambda (word, ones): (word, sum(ones))))
+
+ # Format the counts into a PCollection of strings.
+ output = counts | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
+
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ output | df.io.Write('write', df.io.TextFileSink(known_args.output))
+
+ # Actually run the pipeline (all operations above are deferred).
+ p.run()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_minimal_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal_test.py b/sdks/python/apache_beam/examples/wordcount_minimal_test.py
new file mode 100644
index 0000000..f110c21
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_minimal_test.py
@@ -0,0 +1,56 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the minimal wordcount example."""
+
+import collections
+import logging
+import re
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples import wordcount_minimal
+
+
+class WordCountMinimalTest(unittest.TestCase):
+ """Unit test for wordcount_minimal example with direct runner."""
+
+ SAMPLE_TEXT = 'a b c a b a\n aa bb cc aa bb aa'
+
+ def create_temp_file(self, contents):
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f.write(contents)
+ return f.name
+
+ def test_basics(self):
+ temp_path = self.create_temp_file(self.SAMPLE_TEXT)
+ expected_words = collections.defaultdict(int)
+ for word in re.findall(r'\w+', self.SAMPLE_TEXT):
+ expected_words[word] += 1
+ wordcount_minimal.run([
+ '--input=%s*' % temp_path,
+ '--output=%s.result' % temp_path])
+ # Parse result file and compare.
+ results = []
+ with open(temp_path + '.result-00000-of-00001') as result_file:
+ for line in result_file:
+ match = re.search(r'([a-z]+): ([0-9]+)', line)
+ if match is not None:
+ results.append((match.group(1), int(match.group(2))))
+ self.assertEqual(sorted(results), sorted(expected_words.iteritems()))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/wordcount_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_test.py b/sdks/python/apache_beam/examples/wordcount_test.py
new file mode 100644
index 0000000..72b1e32
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_test.py
@@ -0,0 +1,55 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the wordcount example."""
+
+import collections
+import logging
+import re
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples import wordcount
+
+
+class WordCountTest(unittest.TestCase):
+
+ SAMPLE_TEXT = 'a b c a b a\n\n aa bb cc aa bb aa'
+
+ def create_temp_file(self, contents):
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f.write(contents)
+ return f.name
+
+ def test_basics(self):
+ temp_path = self.create_temp_file(self.SAMPLE_TEXT)
+ expected_words = collections.defaultdict(int)
+ for word in re.findall(r'\w+', self.SAMPLE_TEXT):
+ expected_words[word] += 1
+ wordcount.run([
+ '--input=%s*' % temp_path,
+ '--output=%s.result' % temp_path])
+ # Parse result file and compare.
+ results = []
+ with open(temp_path + '.result-00000-of-00001') as result_file:
+ for line in result_file:
+ match = re.search(r'([a-z]+): ([0-9]+)', line)
+ if match is not None:
+ results.append((match.group(1), int(match.group(2))))
+ self.assertEqual(sorted(results), sorted(expected_words.iteritems()))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/internal/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/__init__.py b/sdks/python/apache_beam/internal/__init__.py
new file mode 100644
index 0000000..e69de29