You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:57 UTC
[22/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/snippets/snippets.py b/sdks/python/google/cloud/dataflow/examples/snippets/snippets.py
deleted file mode 100644
index f6bb63a..0000000
--- a/sdks/python/google/cloud/dataflow/examples/snippets/snippets.py
+++ /dev/null
@@ -1,872 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Code snippets used in Cloud Dataflow webdocs.
-
-The examples here are written specifically to read well with the accompanying
-web docs from https://cloud.google.com/dataflow. Do not rewrite them until you
-make sure the webdocs still read well and the rewritten code supports the
-concept being described. For example, there are snippets that could be shorter
-but they are written like this to make a specific point in the docs.
-
-The code snippets are all organized as self contained functions. Parts of the
-function body delimited by [START tag] and [END tag] will be included
-automatically in the web docs. The naming convention for the tags is to have as
-prefix the PATH_TO_HTML where they are included followed by a descriptive
-string. For instance a code snippet that will be used as a code example
-at https://cloud.google.com/dataflow/model/pipelines will have the tag
-model_pipelines_DESCRIPTION. The tags can contain only letters, digits and _.
-"""
-
-import google.cloud.dataflow as df
-
-# Quiet some pylint warnings that happen because of the somewhat special
-# format for the code snippets.
-# pylint:disable=invalid-name
-# pylint:disable=expression-not-assigned
-# pylint:disable=redefined-outer-name
-# pylint:disable=unused-variable
-# pylint:disable=g-doc-args
-# pylint:disable=g-import-not-at-top
-
-
-class SnippetUtils(object):
- from google.cloud.dataflow.pipeline import PipelineVisitor
-
- class RenameFiles(PipelineVisitor):
- """RenameFiles will rewire source and sink for unit testing.
-
- RenameFiles will rewire the GCS files specified in the source and
- sink in the snippet pipeline to local files so the pipeline can be run as a
- unit test. This is as close as we can get to have code snippets that are
- executed and are also ready to presented in webdocs.
- """
-
- def __init__(self, renames):
- self.renames = renames
-
- def visit_transform(self, transform_node):
- if hasattr(transform_node.transform, 'source'):
- source = transform_node.transform.source
- source.file_path = self.renames['read']
- source.is_gcs_source = False
- elif hasattr(transform_node.transform, 'sink'):
- sink = transform_node.transform.sink
- sink.file_path = self.renames['write']
- sink.is_gcs_sink = False
-
-
-def construct_pipeline(renames):
- """A reverse words snippet as an example for constructing a pipeline.
-
- URL: https://cloud.google.com/dataflow/pipelines/constructing-your-pipeline
- """
- import re
-
- class ReverseWords(df.PTransform):
- """A PTransform that reverses individual elements in a PCollection."""
-
- def apply(self, pcoll):
- return pcoll | df.Map(lambda e: e[::-1])
-
- def filter_words(unused_x):
- """Pass through filter to select everything."""
- return True
-
- # [START pipelines_constructing_creating]
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- p = df.Pipeline(options=PipelineOptions())
- # [END pipelines_constructing_creating]
-
- # [START pipelines_constructing_reading]
- lines = p | df.io.Read('ReadMyFile',
- df.io.TextFileSource('gs://some/inputData.txt'))
- # [END pipelines_constructing_reading]
-
- # [START pipelines_constructing_applying]
- words = lines | df.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- reversed_words = words | ReverseWords()
- # [END pipelines_constructing_applying]
-
- # [START pipelines_constructing_writing]
- filtered_words = reversed_words | df.Filter('FilterWords', filter_words)
- filtered_words | df.io.Write('WriteMyFile',
- df.io.TextFileSink('gs://some/outputData.txt'))
- # [END pipelines_constructing_writing]
-
- p.visit(SnippetUtils.RenameFiles(renames))
-
- # [START pipelines_constructing_running]
- p.run()
- # [END pipelines_constructing_running]
-
-
-def model_pipelines(argv):
- """A wordcount snippet as a simple pipeline example.
-
- URL: https://cloud.google.com/dataflow/model/pipelines
- """
- # [START model_pipelines]
- import re
-
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- class MyOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--input',
- dest='input',
- default='gs://dataflow-samples/shakespeare/kinglear'
- '.txt',
- help='Input file to process.')
- parser.add_argument('--output',
- dest='output',
- required=True,
- help='Output file to write results to.')
-
- pipeline_options = PipelineOptions(argv)
- my_options = pipeline_options.view_as(MyOptions)
-
- p = df.Pipeline(options=pipeline_options)
-
- (p
- | df.io.Read(df.io.TextFileSource(my_options.input))
- | df.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- | df.Map(lambda x: (x, 1)) | df.combiners.Count.PerKey()
- | df.io.Write(df.io.TextFileSink(my_options.output)))
-
- p.run()
- # [END model_pipelines]
-
-
-def model_pcollection(argv):
- """Creating a PCollection from data in local memory.
-
- URL: https://cloud.google.com/dataflow/model/pcollection
- """
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- class MyOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--output',
- dest='output',
- required=True,
- help='Output file to write results to.')
-
- pipeline_options = PipelineOptions(argv)
- my_options = pipeline_options.view_as(MyOptions)
-
- # [START model_pcollection]
- p = df.Pipeline(options=pipeline_options)
-
- (p
- | df.Create([
- 'To be, or not to be: that is the question: ',
- 'Whether \'tis nobler in the mind to suffer ',
- 'The slings and arrows of outrageous fortune, ',
- 'Or to take arms against a sea of troubles, '])
- | df.io.Write(df.io.TextFileSink(my_options.output)))
-
- p.run()
- # [END model_pcollection]
-
-
-def pipeline_options_remote(argv):
- """"Creating a Pipeline using a PipelineOptions object for remote execution.
-
- URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- """
-
- from google.cloud.dataflow import Pipeline
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- # [START pipeline_options_create]
- options = PipelineOptions(flags=argv)
- # [END pipeline_options_create]
-
- # [START pipeline_options_define_custom]
- class MyOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--input')
- parser.add_argument('--output')
- # [END pipeline_options_define_custom]
-
- from google.cloud.dataflow.utils.options import GoogleCloudOptions
- from google.cloud.dataflow.utils.options import StandardOptions
-
- # [START pipeline_options_dataflow_service]
- # Create and set your PipelineOptions.
- options = PipelineOptions(flags=argv)
-
- # For Cloud execution, set the Cloud Platform project, job_name,
- # staging location, temp_location and specify DataflowPipelineRunner or
- # BlockingDataflowPipelineRunner.
- google_cloud_options = options.view_as(GoogleCloudOptions)
- google_cloud_options.project = 'my-project-id'
- google_cloud_options.job_name = 'myjob'
- google_cloud_options.staging_location = 'gs://my-bucket/binaries'
- google_cloud_options.temp_location = 'gs://my-bucket/temp'
- options.view_as(StandardOptions).runner = 'DataflowPipelineRunner'
-
- # Create the Pipeline with the specified options.
- p = Pipeline(options=options)
- # [END pipeline_options_dataflow_service]
-
- my_options = options.view_as(MyOptions)
- my_input = my_options.input
- my_output = my_options.output
-
- # Overriding the runner for tests.
- options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
- p = Pipeline(options=options)
-
- lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(my_input))
- lines | df.io.Write('WriteToText', df.io.TextFileSink(my_output))
-
- p.run()
-
-
-def pipeline_options_local(argv):
- """"Creating a Pipeline using a PipelineOptions object for local execution.
-
- URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- """
-
- from google.cloud.dataflow import Pipeline
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- options = PipelineOptions(flags=argv)
-
- # [START pipeline_options_define_custom_with_help_and_default]
- class MyOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--input',
- help='Input for the dataflow pipeline',
- default='gs://my-bucket/input')
- parser.add_argument('--output',
- help='Output for the dataflow pipeline',
- default='gs://my-bucket/output')
- # [END pipeline_options_define_custom_with_help_and_default]
-
- my_options = options.view_as(MyOptions)
-
- my_input = my_options.input
- my_output = my_options.output
-
- # [START pipeline_options_local]
- # Create and set your Pipeline Options.
- options = PipelineOptions()
- p = Pipeline(options=options)
- # [END pipeline_options_local]
-
- lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(my_input))
- lines | df.io.Write('WriteToText', df.io.TextFileSink(my_output))
- p.run()
-
-
-def pipeline_options_command_line(argv):
- """Creating a Pipeline by passing a list of arguments.
-
- URL: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- """
-
- # [START pipeline_options_command_line]
- # Use Python argparse module to parse custom arguments
- import argparse
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--input')
- parser.add_argument('--output')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- # Create the Pipeline with remaining arguments.
- p = df.Pipeline(argv=pipeline_args)
- lines = p | df.io.Read('ReadFromText', df.io.TextFileSource(known_args.input))
- lines | df.io.Write('WriteToText', df.io.TextFileSink(known_args.output))
- # [END pipeline_options_command_line]
-
- p.run()
-
-
-def pipeline_logging(lines, output):
- """Logging Pipeline Messages.
-
- URL: https://cloud.google.com/dataflow/pipelines/logging
- """
-
- import re
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- # [START pipeline_logging]
- # import Python logging module.
- import logging
-
- class ExtractWordsFn(df.DoFn):
-
- def process(self, context):
- words = re.findall(r'[A-Za-z\']+', context.element)
- for word in words:
- yield word
-
- if word.lower() == 'love':
- # Log using the root logger at info or higher levels
- logging.info('Found : %s', word.lower())
-
- # Remaining WordCount example code ...
- # [END pipeline_logging]
-
- p = df.Pipeline(options=PipelineOptions())
- (p
- | df.Create(lines)
- | df.ParDo('ExtractWords', ExtractWordsFn())
- | df.io.Write('WriteToText', df.io.TextFileSink(output)))
-
- p.run()
-
-
-def pipeline_monitoring(renames):
- """Using monitoring interface snippets.
-
- URL: https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf
- """
-
- import re
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- class WordCountOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--input',
- help='Input for the dataflow pipeline',
- default='gs://my-bucket/input')
- parser.add_argument('--output',
- help='output for the dataflow pipeline',
- default='gs://my-bucket/output')
-
- class ExtractWordsFn(df.DoFn):
-
- def process(self, context):
- words = re.findall(r'[A-Za-z\']+', context.element)
- for word in words:
- yield word
-
- class FormatCountsFn(df.DoFn):
-
- def process(self, context):
- word, count = context.element
- yield '%s: %s' % (word, count)
-
- # [START pipeline_monitoring_composite]
- # The CountWords Composite Transform inside the WordCount pipeline.
- class CountWords(df.PTransform):
-
- def apply(self, pcoll):
- return (pcoll
- # Convert lines of text into individual words.
- | df.ParDo('ExtractWords', ExtractWordsFn())
- # Count the number of times each word occurs.
- | df.combiners.Count.PerElement()
- # Format each word and count into a printable string.
- | df.ParDo('FormatCounts', FormatCountsFn()))
- # [END pipeline_monitoring_composite]
-
- pipeline_options = PipelineOptions()
- options = pipeline_options.view_as(WordCountOptions)
- p = df.Pipeline(options=pipeline_options)
-
- # [START pipeline_monitoring_execution]
- (p
- # Read the lines of the input text.
- | df.io.Read('ReadLines', df.io.TextFileSource(options.input))
- # Count the words.
- | CountWords()
- # Write the formatted word counts to output.
- | df.io.Write('WriteCounts', df.io.TextFileSink(options.output)))
- # [END pipeline_monitoring_execution]
-
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
-
-
-def examples_wordcount_minimal(renames):
- """MinimalWordCount example snippets.
-
- URL:
- https://cloud.google.com/dataflow/examples/wordcount-example#MinimalWordCount
- """
- import re
-
- import google.cloud.dataflow as df
-
- from google.cloud.dataflow.utils.options import GoogleCloudOptions
- from google.cloud.dataflow.utils.options import StandardOptions
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- # [START examples_wordcount_minimal_options]
- options = PipelineOptions()
- google_cloud_options = options.view_as(GoogleCloudOptions)
- google_cloud_options.project = 'my-project-id'
- google_cloud_options.job_name = 'myjob'
- google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
- google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
- options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
- # [END examples_wordcount_minimal_options]
-
- # Run it locally for testing.
- options = PipelineOptions()
-
- # [START examples_wordcount_minimal_create]
- p = df.Pipeline(options=options)
- # [END examples_wordcount_minimal_create]
-
- (
- # [START examples_wordcount_minimal_read]
- p | df.io.Read(df.io.TextFileSource(
- 'gs://dataflow-samples/shakespeare/kinglear.txt'))
- # [END examples_wordcount_minimal_read]
-
- # [START examples_wordcount_minimal_pardo]
- | df.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
- # [END examples_wordcount_minimal_pardo]
-
- # [START examples_wordcount_minimal_count]
- | df.combiners.Count.PerElement()
- # [END examples_wordcount_minimal_count]
-
- # [START examples_wordcount_minimal_map]
- | df.Map(lambda (word, count): '%s: %s' % (word, count))
- # [END examples_wordcount_minimal_map]
-
- # [START examples_wordcount_minimal_write]
- | df.io.Write(df.io.TextFileSink('gs://my-bucket/counts.txt'))
- # [END examples_wordcount_minimal_write]
- )
-
- p.visit(SnippetUtils.RenameFiles(renames))
-
- # [START examples_wordcount_minimal_run]
- p.run()
- # [END examples_wordcount_minimal_run]
-
-
-def examples_wordcount_wordcount(renames):
- """WordCount example snippets.
-
- URL:
- https://cloud.google.com/dataflow/examples/wordcount-example#WordCount
- """
- import re
-
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- argv = []
-
- # [START examples_wordcount_wordcount_options]
- class WordCountOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--input',
- help='Input for the dataflow pipeline',
- default='gs://my-bucket/input')
-
- options = PipelineOptions(argv)
- p = df.Pipeline(options=options)
- # [END examples_wordcount_wordcount_options]
-
- lines = p | df.io.Read(df.io.TextFileSource(
- 'gs://dataflow-samples/shakespeare/kinglear.txt'))
-
- # [START examples_wordcount_wordcount_composite]
- class CountWords(df.PTransform):
-
- def apply(self, pcoll):
- return (pcoll
- # Convert lines of text into individual words.
- | df.FlatMap(
- 'ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
-
- # Count the number of times each word occurs.
- | df.combiners.Count.PerElement())
-
- counts = lines | CountWords()
- # [END examples_wordcount_wordcount_composite]
-
- # [START examples_wordcount_wordcount_dofn]
- class FormatAsTextFn(df.DoFn):
-
- def process(self, context):
- word, count = context.element
- yield '%s: %s' % (word, count)
-
- formatted = counts | df.ParDo(FormatAsTextFn())
- # [END examples_wordcount_wordcount_dofn]
-
- formatted | df.io.Write(df.io.TextFileSink('gs://my-bucket/counts.txt'))
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
-
-
-def examples_wordcount_debugging(renames):
- """DebuggingWordCount example snippets.
-
- URL:
- https://cloud.google.com/dataflow/examples/wordcount-example#DebuggingWordCount
- """
- import re
-
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- # [START example_wordcount_debugging_logging]
- # [START example_wordcount_debugging_aggregators]
- import logging
-
- class FilterTextFn(df.DoFn):
- """A DoFn that filters for a specific key based on a regular expression."""
-
- # A custom aggregator can track values in your pipeline as it runs. Create
- # custom aggregators matched_word and unmatched_words.
- matched_words = df.Aggregator('matched_words')
- umatched_words = df.Aggregator('umatched_words')
-
- def __init__(self, pattern):
- self.pattern = pattern
-
- def process(self, context):
- word, _ = context.element
- if re.match(self.pattern, word):
- # Log at INFO level each element we match. When executing this pipeline
- # using the Dataflow service, these log lines will appear in the Cloud
- # Logging UI.
- logging.info('Matched %s', word)
-
- # Add 1 to the custom aggregator matched_words
- context.aggregate_to(self.matched_words, 1)
- yield context.element
- else:
- # Log at the "DEBUG" level each element that is not matched. Different
- # log levels can be used to control the verbosity of logging providing
- # an effective mechanism to filter less important information. Note
- # currently only "INFO" and higher level logs are emitted to the Cloud
- # Logger. This log message will not be visible in the Cloud Logger.
- logging.debug('Did not match %s', word)
-
- # Add 1 to the custom aggregator umatched_words
- context.aggregate_to(self.umatched_words, 1)
- # [END example_wordcount_debugging_logging]
- # [END example_wordcount_debugging_aggregators]
-
- p = df.Pipeline(options=PipelineOptions())
- filtered_words = (
- p
- | df.io.Read(df.io.TextFileSource(
- 'gs://dataflow-samples/shakespeare/kinglear.txt'))
- | df.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
- | df.combiners.Count.PerElement()
- | df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
-
- # [START example_wordcount_debugging_assert]
- df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))
- # [END example_wordcount_debugging_assert]
-
- output = (filtered_words
- | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
- | df.io.Write(
- 'write', df.io.TextFileSink('gs://my-bucket/counts.txt')))
-
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
-
-
-def model_textio(renames):
- """Using a Read and Write transform to read/write text files.
-
- URLs:
- https://cloud.google.com/dataflow/model/pipeline-io
- https://cloud.google.com/dataflow/model/text-io
- """
- def filter_words(x):
- import re
- return re.findall(r'[A-Za-z\']+', x)
-
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- # [START model_textio_read]
- p = df.Pipeline(options=PipelineOptions())
- # [START model_pipelineio_read]
- lines = p | df.io.Read(
- 'ReadFromText',
- df.io.TextFileSource('gs://my_bucket/path/to/input-*.csv'))
- # [END model_pipelineio_read]
- # [END model_textio_read]
-
- # [START model_textio_write]
- filtered_words = lines | df.FlatMap('FilterWords', filter_words)
- # [START model_pipelineio_write]
- filtered_words | df.io.Write(
- 'WriteToText', df.io.TextFileSink('gs://my_bucket/path/to/numbers',
- file_name_suffix='.csv'))
- # [END model_pipelineio_write]
- # [END model_textio_write]
-
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
-
-
-def model_bigqueryio():
- """Using a Read and Write transform to read/write to BigQuery.
-
- URL: https://cloud.google.com/dataflow/model/bigquery-io
- """
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
-
- # [START model_bigqueryio_read]
- p = df.Pipeline(options=PipelineOptions())
- weather_data = p | df.io.Read(
- 'ReadWeatherStations',
- df.io.BigQuerySource(
- 'clouddataflow-readonly:samples.weather_stations'))
- # [END model_bigqueryio_read]
-
- # [START model_bigqueryio_query]
- p = df.Pipeline(options=PipelineOptions())
- weather_data = p | df.io.Read(
- 'ReadYearAndTemp',
- df.io.BigQuerySource(
- query='SELECT year, mean_temp FROM samples.weather_stations'))
- # [END model_bigqueryio_query]
-
- # [START model_bigqueryio_schema]
- schema = 'source:STRING, quote:STRING'
- # [END model_bigqueryio_schema]
-
- # [START model_bigqueryio_write]
- quotes = p | df.Create(
- [{'source': 'Mahatma Ghandi', 'quote': 'My life is my message.'}])
- quotes | df.io.Write(
- 'Write', df.io.BigQuerySink(
- 'my-project:output.output_table',
- schema=schema,
- write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE,
- create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED))
- # [END model_bigqueryio_write]
-
-
-def model_composite_transform_example(contents, output_path):
- """Example of a composite transform.
-
- To declare a composite transform, define a subclass of PTransform.
-
- To override the apply method, define a method "apply" that
- takes a PCollection as its only parameter and returns a PCollection.
-
- URL: https://cloud.google.com/dataflow/model/composite-transforms
- """
- import re
-
- import google.cloud.dataflow as df
-
- # [START composite_transform_example]
- # [START composite_ptransform_apply_method]
- # [START composite_ptransform_declare]
- class CountWords(df.PTransform):
- # [END composite_ptransform_declare]
-
- def apply(self, pcoll):
- return (pcoll
- | df.FlatMap(lambda x: re.findall(r'\w+', x))
- | df.combiners.Count.PerElement()
- | df.Map(lambda (word, c): '%s: %s' % (word, c)))
- # [END composite_ptransform_apply_method]
- # [END composite_transform_example]
-
- from google.cloud.dataflow.utils.options import PipelineOptions
- p = df.Pipeline(options=PipelineOptions())
- (p
- | df.Create(contents)
- | CountWords()
- | df.io.Write(df.io.TextFileSink(output_path)))
- p.run()
-
-
-def model_multiple_pcollections_flatten(contents, output_path):
- """Merging a PCollection with Flatten.
-
- URL: https://cloud.google.com/dataflow/model/multiple-pcollections
- """
- some_hash_fn = lambda s: ord(s[0])
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
- p = df.Pipeline(options=PipelineOptions())
- partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
-
- # Partition into deciles
- partitioned = p | df.Create(contents) | df.Partition(partition_fn, 3)
- pcoll1 = partitioned[0]
- pcoll2 = partitioned[1]
- pcoll3 = partitioned[2]
-
- # Flatten them back into 1
-
- # A collection of PCollection objects can be represented simply
- # as a tuple (or list) of PCollections.
- # (The SDK for Python has no separate type to store multiple
- # PCollection objects, whether containing the same or different
- # types.)
- # [START model_multiple_pcollections_flatten]
- merged = (
- # [START model_multiple_pcollections_tuple]
- (pcoll1, pcoll2, pcoll3)
- # [END model_multiple_pcollections_tuple]
- # A list of tuples can be "piped" directly into a Flatten transform.
- | df.Flatten())
- # [END model_multiple_pcollections_flatten]
- merged | df.io.Write(df.io.TextFileSink(output_path))
-
- p.run()
-
-
-def model_multiple_pcollections_partition(contents, output_path):
- """Splitting a PCollection with Partition.
-
- URL: https://cloud.google.com/dataflow/model/multiple-pcollections
- """
- some_hash_fn = lambda s: ord(s[0])
- def get_percentile(i):
- """Assume i in [0,100)."""
- return i
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
- p = df.Pipeline(options=PipelineOptions())
-
- students = p | df.Create(contents)
- # [START model_multiple_pcollections_partition]
- def partition_fn(student, num_partitions):
- return int(get_percentile(student) * num_partitions / 100)
-
- by_decile = students | df.Partition(partition_fn, 10)
- # [END model_multiple_pcollections_partition]
- # [START model_multiple_pcollections_partition_40th]
- fortieth_percentile = by_decile[4]
- # [END model_multiple_pcollections_partition_40th]
-
- ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
- | df.Flatten()
- | df.io.Write(df.io.TextFileSink(output_path)))
-
- p.run()
-
-
-def model_group_by_key(contents, output_path):
- """Applying a GroupByKey Transform.
-
- URL: https://cloud.google.com/dataflow/model/group-by-key
- """
- import re
-
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
- p = df.Pipeline(options=PipelineOptions())
- words_and_counts = (
- p
- | df.Create(contents)
- | df.FlatMap(lambda x: re.findall(r'\w+', x))
- | df.Map('one word', lambda w: (w, 1)))
- # GroupByKey accepts a PCollection of (w, 1) and
- # outputs a PCollection of (w, (1, 1, ...)).
- # (A key/value pair is just a tuple in Python.)
- # This is a somewhat forced example, since one could
- # simply use df.combiners.Count.PerElement here.
- # [START model_group_by_key_transform]
- grouped_words = words_and_counts | df.GroupByKey()
- # [END model_group_by_key_transform]
- (grouped_words
- | df.Map('count words', lambda (word, counts): (word, len(counts)))
- | df.io.Write(df.io.TextFileSink(output_path)))
- p.run()
-
-
-def model_co_group_by_key_tuple(email_list, phone_list, output_path):
- """Applying a CoGroupByKey Transform to a tuple.
-
- URL: https://cloud.google.com/dataflow/model/group-by-key
- """
- import google.cloud.dataflow as df
- from google.cloud.dataflow.utils.options import PipelineOptions
- p = df.Pipeline(options=PipelineOptions())
- # [START model_group_by_key_cogroupbykey_tuple]
- # Each data set is represented by key-value pairs in separate PCollections.
- # Both data sets share a common key type (in this example str).
- # The email_list contains values such as: ('joe', 'joe@example.com') with
- # multiple possible values for each key.
- # The phone_list contains values such as: ('mary': '111-222-3333') with
- # multiple possible values for each key.
- emails = p | df.Create('email', email_list)
- phones = p | df.Create('phone', phone_list)
- # The result PCollection contains one key-value element for each key in the
- # input PCollections. The key of the pair will be the key from the input and
- # the value will be a dictionary with two entries: 'emails' - an iterable of
- # all values for the current key in the emails PCollection and 'phones': an
- # iterable of all values for the current key in the phones PCollection.
- # For instance, if 'emails' contained ('joe', 'joe@example.com') and
- # ('joe', 'joe@gmail.com'), then 'result' will contain the element
- # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...})
- result = {'emails': emails, 'phones': phones} | df.CoGroupByKey()
-
- def join_info((name, info)):
- return '; '.join(['%s' % name,
- '%s' % ','.join(info['emails']),
- '%s' % ','.join(info['phones'])])
-
- contact_lines = result | df.Map(join_info)
- # [END model_group_by_key_cogroupbykey_tuple]
- contact_lines | df.io.Write(df.io.TextFileSink(output_path))
- p.run()
-
-
-# [START model_library_transforms_keys]
-class Keys(df.PTransform):
-
- def apply(self, pcoll):
- return pcoll | df.Map('Keys', lambda (k, v): k)
-# [END model_library_transforms_keys]
-# pylint: enable=invalid-name
-
-
-# [START model_library_transforms_count]
-class Count(df.PTransform):
-
- def apply(self, pcoll):
- return (
- pcoll
- | df.Map('Init', lambda v: (v, 1))
- | df.CombinePerKey(sum))
-# [END model_library_transforms_count]
-# pylint: enable=g-wrong-blank-lines
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/snippets/snippets_test.py b/sdks/python/google/cloud/dataflow/examples/snippets/snippets_test.py
deleted file mode 100644
index 4c2014f..0000000
--- a/sdks/python/google/cloud/dataflow/examples/snippets/snippets_test.py
+++ /dev/null
@@ -1,560 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tests for all code snippets used in public docs."""
-
-import logging
-import sys
-import tempfile
-import unittest
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow import io
-from google.cloud.dataflow import pvalue
-from google.cloud.dataflow import typehints
-from google.cloud.dataflow.examples.snippets import snippets
-from google.cloud.dataflow.io import fileio
-from google.cloud.dataflow.utils.options import TypeOptions
-
-
-# Monky-patch to use native sink for file path re-writing.
-io.TextFileSink = fileio.NativeTextFileSink
-
-
-class ParDoTest(unittest.TestCase):
- """Tests for dataflow/model/par-do."""
-
- def test_pardo(self):
- # Note: "words" and "ComputeWordLengthFn" are referenced by name in
- # the text of the doc.
-
- words = ['aa', 'bbb', 'c']
- # [START model_pardo_pardo]
- class ComputeWordLengthFn(df.DoFn):
- def process(self, context):
- return [len(context.element)]
- # [END model_pardo_pardo]
-
- # [START model_pardo_apply]
- # Apply a ParDo to the PCollection "words" to compute lengths for each word.
- word_lengths = words | df.ParDo(ComputeWordLengthFn())
- # [END model_pardo_apply]
- self.assertEqual({2, 3, 1}, set(word_lengths))
-
- def test_pardo_yield(self):
- words = ['aa', 'bbb', 'c']
- # [START model_pardo_yield]
- class ComputeWordLengthFn(df.DoFn):
- def process(self, context):
- yield len(context.element)
- # [END model_pardo_yield]
-
- word_lengths = words | df.ParDo(ComputeWordLengthFn())
- self.assertEqual({2, 3, 1}, set(word_lengths))
-
- def test_pardo_using_map(self):
- words = ['aa', 'bbb', 'c']
- # [START model_pardo_using_map]
- word_lengths = words | df.Map(len)
- # [END model_pardo_using_map]
-
- self.assertEqual({2, 3, 1}, set(word_lengths))
-
- def test_pardo_using_flatmap(self):
- words = ['aa', 'bbb', 'c']
- # [START model_pardo_using_flatmap]
- word_lengths = words | df.FlatMap(lambda word: [len(word)])
- # [END model_pardo_using_flatmap]
-
- self.assertEqual({2, 3, 1}, set(word_lengths))
-
- def test_pardo_using_flatmap_yield(self):
- words = ['aA', 'bbb', 'C']
- # [START model_pardo_using_flatmap_yield]
- def capitals(word):
- for letter in word:
- if 'A' <= letter <= 'Z':
- yield letter
- all_capitals = words | df.FlatMap(capitals)
- # [END model_pardo_using_flatmap_yield]
-
- self.assertEqual({'A', 'C'}, set(all_capitals))
-
- def test_pardo_with_label(self):
- words = ['aa', 'bbc', 'defg']
- # [START model_pardo_with_label]
- result = words | df.Map('CountUniqueLetters', lambda word: len(set(word)))
- # [END model_pardo_with_label]
-
- self.assertEqual({1, 2, 4}, set(result))
-
- def test_pardo_side_input(self):
- p = df.Pipeline('DirectPipelineRunner')
- words = p | df.Create('start', ['a', 'bb', 'ccc', 'dddd'])
-
- # [START model_pardo_side_input]
- # Callable takes additional arguments.
- def filter_using_length(word, lower_bound, upper_bound=float('inf')):
- if lower_bound <= len(word) <= upper_bound:
- yield word
-
- # Construct a deferred side input.
- avg_word_len = words | df.Map(len) | df.CombineGlobally(df.combiners.MeanCombineFn())
-
- # Call with explicit side inputs.
- small_words = words | df.FlatMap('small', filter_using_length, 0, 3)
-
- # A single deferred side input.
- larger_than_average = words | df.FlatMap('large',
- filter_using_length,
- lower_bound=pvalue.AsSingleton(avg_word_len))
-
- # Mix and match.
- small_but_nontrivial = words | df.FlatMap(filter_using_length,
- lower_bound=2,
- upper_bound=pvalue.AsSingleton(avg_word_len))
- # [END model_pardo_side_input]
-
- df.assert_that(small_words, df.equal_to(['a', 'bb', 'ccc']))
- df.assert_that(larger_than_average, df.equal_to(['ccc', 'dddd']),
- label='larger_than_average')
- df.assert_that(small_but_nontrivial, df.equal_to(['bb']),
- label='small_but_not_trivial')
- p.run()
-
- def test_pardo_side_input_dofn(self):
- words = ['a', 'bb', 'ccc', 'dddd']
-
- # [START model_pardo_side_input_dofn]
- class FilterUsingLength(df.DoFn):
- def process(self, context, lower_bound, upper_bound=float('inf')):
- if lower_bound <= len(context.element) <= upper_bound:
- yield context.element
-
- small_words = words | df.ParDo(FilterUsingLength(), 0, 3)
- # [END model_pardo_side_input_dofn]
- self.assertEqual({'a', 'bb', 'ccc'}, set(small_words))
-
- def test_pardo_with_side_outputs(self):
- # [START model_pardo_emitting_values_on_side_outputs]
- class ProcessWords(df.DoFn):
-
- def process(self, context, cutoff_length, marker):
- if len(context.element) <= cutoff_length:
- # Emit this short word to the main output.
- yield context.element
- else:
- # Emit this word's long length to a side output.
- yield pvalue.SideOutputValue(
- 'above_cutoff_lengths', len(context.element))
- if context.element.startswith(marker):
- # Emit this word to a different side output.
- yield pvalue.SideOutputValue('marked strings', context.element)
- # [END model_pardo_emitting_values_on_side_outputs]
-
- words = ['a', 'an', 'the', 'music', 'xyz']
-
- # [START model_pardo_with_side_outputs]
- results = (words | df.ParDo(ProcessWords(), cutoff_length=2, marker='x')
- .with_outputs('above_cutoff_lengths',
- 'marked strings',
- main='below_cutoff_strings'))
- below = results.below_cutoff_strings
- above = results.above_cutoff_lengths
- marked = results['marked strings'] # indexing works as well
- # [END model_pardo_with_side_outputs]
-
- self.assertEqual({'a', 'an'}, set(below))
- self.assertEqual({3, 5}, set(above))
- self.assertEqual({'xyz'}, set(marked))
-
- # [START model_pardo_with_side_outputs_iter]
- below, above, marked = (words | df.ParDo(ProcessWords(), cutoff_length=2, marker='x')
- .with_outputs('above_cutoff_lengths',
- 'marked strings',
- main='below_cutoff_strings'))
- # [END model_pardo_with_side_outputs_iter]
-
- self.assertEqual({'a', 'an'}, set(below))
- self.assertEqual({3, 5}, set(above))
- self.assertEqual({'xyz'}, set(marked))
-
- def test_pardo_with_undeclared_side_outputs(self):
- numbers = [1, 2, 3, 4, 5, 10, 20]
- # [START model_pardo_with_side_outputs_undeclared]
- def even_odd(x):
- yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
- if x % 10 == 0:
- yield x
-
- results = numbers | df.FlatMap(even_odd).with_outputs()
-
- evens = results.even
- odds = results.odd
- tens = results[None] # the undeclared main output
- # [END model_pardo_with_side_outputs_undeclared]
-
- self.assertEqual({2, 4, 10, 20}, set(evens))
- self.assertEqual({1, 3, 5}, set(odds))
- self.assertEqual({10, 20}, set(tens))
-
-
-class TypeHintsTest(unittest.TestCase):
-
- def test_bad_types(self):
- p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
-
- # [START type_hints_missing_define_numbers]
- numbers = p | df.Create(['1', '2', '3'])
- # [END type_hints_missing_define_numbers]
-
- # Consider the following code.
- # [START type_hints_missing_apply]
- evens = numbers | df.Filter(lambda x: x % 2 == 0)
- # [END type_hints_missing_apply]
-
- # Now suppose numers was defined as [snippet above].
- # When running this pipeline, you'd get a runtime error,
- # possibly on a remote machine, possibly very late.
-
- with self.assertRaises(TypeError):
- p.run()
-
- # To catch this early, we can assert what types we expect.
- with self.assertRaises(typehints.TypeCheckError):
- # [START type_hints_takes]
- p.options.view_as(TypeOptions).pipeline_type_check = True
- evens = numbers | df.Filter(lambda x: x % 2 == 0).with_input_types(int)
- # [END type_hints_takes]
-
- # Type hints can be declared on DoFns and callables as well, rather
- # than where they're used, to be more self contained.
- with self.assertRaises(typehints.TypeCheckError):
- # [START type_hints_do_fn]
- @df.typehints.with_input_types(int)
- class FilterEvensDoFn(df.DoFn):
- def process(self, context):
- if context.element % 2 == 0:
- yield context.element
- evens = numbers | df.ParDo(FilterEvensDoFn())
- # [END type_hints_do_fn]
-
- words = p | df.Create('words', ['a', 'bb', 'c'])
- # One can assert outputs and apply them to transforms as well.
- # Helps document the contract and checks it at pipeline construction time.
- # [START type_hints_transform]
- T = df.typehints.TypeVariable('T')
- @df.typehints.with_input_types(T)
- @df.typehints.with_output_types(df.typehints.Tuple[int, T])
- class MyTransform(df.PTransform):
- def apply(self, pcoll):
- return pcoll | df.Map(lambda x: (len(x), x))
-
- words_with_lens = words | MyTransform()
- # [END type_hints_transform]
-
- with self.assertRaises(typehints.TypeCheckError):
- words_with_lens | df.Map(lambda x: x).with_input_types(
- df.typehints.Tuple[int, int])
-
- def test_runtime_checks_off(self):
- p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
- # [START type_hints_runtime_off]
- p | df.Create(['a']) | df.Map(lambda x: 3).with_output_types(str)
- p.run()
- # [END type_hints_runtime_off]
-
- def test_runtime_checks_on(self):
- p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
- with self.assertRaises(typehints.TypeCheckError):
- # [START type_hints_runtime_on]
- p.options.view_as(TypeOptions).runtime_type_check = True
- p | df.Create(['a']) | df.Map(lambda x: 3).with_output_types(str)
- p.run()
- # [END type_hints_runtime_on]
-
- def test_deterministic_key(self):
- p = df.Pipeline('DirectPipelineRunner', argv=sys.argv)
- lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']
-
- # [START type_hints_deterministic_key]
- class Player(object):
- def __init__(self, team, name):
- self.team = team
- self.name = name
-
- class PlayerCoder(df.coders.Coder):
- def encode(self, player):
- return '%s:%s' % (player.team, player.name)
-
- def decode(self, s):
- return Player(*s.split(':'))
-
- def is_deterministic(self):
- return True
-
- df.coders.registry.register_coder(Player, PlayerCoder)
-
- def parse_player_and_score(csv):
- name, team, score = csv.split(',')
- return Player(team, name), int(score)
-
- totals = (
- lines
- | df.Map(parse_player_and_score)
- | df.CombinePerKey(sum).with_input_types(df.typehints.Tuple[Player, int]))
- # [END type_hints_deterministic_key]
-
- self.assertEquals(
- {('banana', 3), ('kiwi', 4), ('zucchini', 3)},
- set(totals | df.Map(lambda (k, v): (k.name, v))))
-
-
-class SnippetsTest(unittest.TestCase):
-
- def create_temp_file(self, contents=''):
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(contents)
- return f.name
-
- def get_output(self, path, sorted_output=True, suffix=''):
- with open(path + '-00000-of-00001' + suffix) as f:
- lines = f.readlines()
- if sorted_output:
- return sorted(s.rstrip('\n') for s in lines)
- else:
- return [s.rstrip('\n') for s in lines]
-
- def test_model_pipelines(self):
- temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
- result_path = temp_path + '.result'
- snippets.model_pipelines([
- '--input=%s*' % temp_path,
- '--output=%s' % result_path])
- self.assertEqual(
- self.get_output(result_path),
- [str(s) for s in [(u'aa', 1), (u'bb', 2), (u'cc', 3)]])
-
- def test_model_pcollection(self):
- temp_path = self.create_temp_file()
- snippets.model_pcollection(['--output=%s' % temp_path])
- self.assertEqual(self.get_output(temp_path, sorted_output=False), [
- 'To be, or not to be: that is the question: ',
- 'Whether \'tis nobler in the mind to suffer ',
- 'The slings and arrows of outrageous fortune, ',
- 'Or to take arms against a sea of troubles, '])
-
- def test_construct_pipeline(self):
- temp_path = self.create_temp_file(
- 'abc def ghi\n jkl mno pqr\n stu vwx yz')
- result_path = self.create_temp_file()
- snippets.construct_pipeline({'read': temp_path, 'write': result_path})
- self.assertEqual(
- self.get_output(result_path),
- ['cba', 'fed', 'ihg', 'lkj', 'onm', 'rqp', 'uts', 'xwv', 'zy'])
-
- def test_model_textio(self):
- temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
- result_path = temp_path + '.result'
- snippets.model_textio({'read': temp_path, 'write': result_path})
- self.assertEqual(
- ['aa', 'bb', 'bb', 'cc', 'cc', 'cc'],
- self.get_output(result_path, suffix='.csv'))
-
- def test_model_bigqueryio(self):
- # We cannot test BigQueryIO functionality in unit tests therefore we limit
- # ourselves to making sure the pipeline containing BigQuery sources and
- # sinks can be built.
- self.assertEqual(None, snippets.model_bigqueryio())
-
- def _run_test_pipeline_for_options(self, fn):
- temp_path = self.create_temp_file('aa\nbb\ncc')
- result_path = temp_path + '.result'
- fn([
- '--input=%s*' % temp_path,
- '--output=%s' % result_path])
- self.assertEqual(
- ['aa', 'bb', 'cc'],
- self.get_output(result_path))
-
- def test_pipeline_options_local(self):
- self._run_test_pipeline_for_options(snippets.pipeline_options_local)
-
- def test_pipeline_options_remote(self):
- self._run_test_pipeline_for_options(snippets.pipeline_options_remote)
-
- def test_pipeline_options_command_line(self):
- self._run_test_pipeline_for_options(snippets.pipeline_options_command_line)
-
- def test_pipeline_logging(self):
- result_path = self.create_temp_file()
- lines = ['we found love right where we are',
- 'we found love right from the start',
- 'we found love in a hopeless place']
- snippets.pipeline_logging(lines, result_path)
- self.assertEqual(
- sorted(' '.join(lines).split(' ')),
- self.get_output(result_path))
-
- def test_examples_wordcount(self):
- pipelines = [snippets.examples_wordcount_minimal,
- snippets.examples_wordcount_wordcount,
- snippets.pipeline_monitoring]
-
- for pipeline in pipelines:
- temp_path = self.create_temp_file(
- 'abc def ghi\n abc jkl')
- result_path = self.create_temp_file()
- pipeline({'read': temp_path, 'write': result_path})
- self.assertEqual(
- self.get_output(result_path),
- ['abc: 2', 'def: 1', 'ghi: 1', 'jkl: 1'])
-
- def test_examples_wordcount_debugging(self):
- temp_path = self.create_temp_file(
- 'Flourish Flourish Flourish stomach abc def')
- result_path = self.create_temp_file()
- snippets.examples_wordcount_debugging(
- {'read': temp_path, 'write': result_path})
- self.assertEqual(
- self.get_output(result_path),
- ['Flourish: 3', 'stomach: 1'])
-
- def test_model_composite_transform_example(self):
- contents = ['aa bb cc', 'bb cc', 'cc']
- result_path = self.create_temp_file()
- snippets.model_composite_transform_example(contents, result_path)
- self.assertEqual(['aa: 1', 'bb: 2', 'cc: 3'], self.get_output(result_path))
-
- def test_model_multiple_pcollections_flatten(self):
- contents = ['a', 'b', 'c', 'd', 'e', 'f']
- result_path = self.create_temp_file()
- snippets.model_multiple_pcollections_flatten(contents, result_path)
- self.assertEqual(contents, self.get_output(result_path))
-
- def test_model_multiple_pcollections_partition(self):
- contents = [17, 42, 64, 32, 0, 99, 53, 89]
- result_path = self.create_temp_file()
- snippets.model_multiple_pcollections_partition(contents, result_path)
- self.assertEqual(['0', '17', '32', '42', '53', '64', '89', '99'],
- self.get_output(result_path))
-
- def test_model_group_by_key(self):
- contents = ['a bb ccc bb bb a']
- result_path = self.create_temp_file()
- snippets.model_group_by_key(contents, result_path)
- expected = [('a', 2), ('bb', 3), ('ccc', 1)]
- self.assertEqual([str(s) for s in expected], self.get_output(result_path))
-
- def test_model_co_group_by_key_tuple(self):
- email_list = [['a', 'a@example.com'], ['b', 'b@example.com']]
- phone_list = [['a', 'x4312'], ['b', 'x8452']]
- result_path = self.create_temp_file()
- snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path)
- expect = ['a; a@example.com; x4312', 'b; b@example.com; x8452']
- self.assertEqual(expect, self.get_output(result_path))
-
-
-class CombineTest(unittest.TestCase):
- """Tests for dataflow/model/combine."""
-
- def test_global_sum(self):
- pc = [1, 2, 3]
- # [START global_sum]
- result = pc | df.CombineGlobally(sum)
- # [END global_sum]
- self.assertEqual([6], result)
-
- def test_combine_values(self):
- occurences = [('cat', 1), ('cat', 5), ('cat', 9), ('dog', 5), ('dog', 2)]
- # [START combine_values]
- first_occurences = occurences | df.GroupByKey() | df.CombineValues(min)
- # [END combine_values]
- self.assertEqual({('cat', 1), ('dog', 2)}, set(first_occurences))
-
- def test_combine_per_key(self):
- player_accuracies = [
- ('cat', 1), ('cat', 5), ('cat', 9), ('cat', 1),
- ('dog', 5), ('dog', 2)]
- # [START combine_per_key]
- avg_accuracy_per_player = player_accuracies | df.CombinePerKey(df.combiners.MeanCombineFn())
- # [END combine_per_key]
- self.assertEqual({('cat', 4.0), ('dog', 3.5)}, set(avg_accuracy_per_player))
-
- def test_combine_concat(self):
- pc = ['a', 'b']
- # [START combine_concat]
- def concat(values, separator=', '):
- return separator.join(values)
- with_commas = pc | df.CombineGlobally(concat)
- with_dashes = pc | df.CombineGlobally(concat, separator='-')
- # [END combine_concat]
- self.assertEqual(1, len(with_commas))
- self.assertTrue(with_commas[0] in {'a, b', 'b, a'})
- self.assertEqual(1, len(with_dashes))
- self.assertTrue(with_dashes[0] in {'a-b', 'b-a'})
-
- def test_bounded_sum(self):
- # [START combine_bounded_sum]
- pc = [1, 10, 100, 1000]
- def bounded_sum(values, bound=500):
- return min(sum(values), bound)
- small_sum = pc | df.CombineGlobally(bounded_sum) # [500]
- large_sum = pc | df.CombineGlobally(bounded_sum, bound=5000) # [1111]
- # [END combine_bounded_sum]
- self.assertEqual([500], small_sum)
- self.assertEqual([1111], large_sum)
-
- def test_combine_reduce(self):
- factors = [2, 3, 5, 7]
- # [START combine_reduce]
- import functools
- import operator
- product = factors | df.CombineGlobally(functools.partial(reduce, operator.mul), 1)
- # [END combine_reduce]
- self.assertEqual([210], product)
-
- def test_custom_average(self):
- pc = [2, 3, 5, 7]
-
-
- # [START combine_custom_average]
- class AverageFn(df.CombineFn):
- def create_accumulator(self):
- return (0.0, 0)
- def add_input(self, (sum, count), input):
- return sum + input, count + 1
- def merge_accumulators(self, accumulators):
- sums, counts = zip(*accumulators)
- return sum(sums), sum(counts)
- def extract_output(self, (sum, count)):
- return sum / count if count else float('NaN')
- average = pc | df.CombineGlobally(AverageFn())
- # [END combine_custom_average]
- self.assertEqual([4.25], average)
-
- def test_keys(self):
- occurrences = [('cat', 1), ('cat', 5), ('dog', 5), ('cat', 9), ('dog', 2)]
- unique_keys = occurrences | snippets.Keys()
- self.assertEqual({'cat', 'dog'}, set(unique_keys))
-
- def test_count(self):
- occurrences = ['cat', 'dog', 'cat', 'cat', 'dog']
- perkey_counts = occurrences | snippets.Count()
- self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts))
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/streaming_wordcap.py b/sdks/python/google/cloud/dataflow/examples/streaming_wordcap.py
deleted file mode 100644
index 67efb96..0000000
--- a/sdks/python/google/cloud/dataflow/examples/streaming_wordcap.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A streaming string-capitalization workflow.
-
-Important: streaming pipeline support in Python Dataflow is in development
-and is not yet available for use.
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-
-import google.cloud.dataflow as df
-
-
-def run(argv=None):
- """Build and run the pipeline."""
-
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--input_topic', dest='input_topic', required=True,
- help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
- parser.add_argument(
- '--output_topic', dest='output_topic', required=True,
- help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
-
- # Read the text file[pattern] into a PCollection.
- lines = p | df.io.Read(
- 'read', df.io.PubSubSource(known_args.input_topic))
-
- # Capitalize the characters in each line.
- transformed = (lines
- | (df.Map('capitalize', lambda x: x.upper())))
-
- # Write to PubSub.
- # pylint: disable=expression-not-assigned
- transformed | df.io.Write(
- 'pubsub_write', df.io.PubSubSink(known_args.output_topic))
-
- p.run()
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/streaming_wordcount.py b/sdks/python/google/cloud/dataflow/examples/streaming_wordcount.py
deleted file mode 100644
index 210d301..0000000
--- a/sdks/python/google/cloud/dataflow/examples/streaming_wordcount.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A streaming word-counting workflow.
-
-Important: streaming pipeline support in Python Dataflow is in development
-and is not yet available for use.
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-import re
-
-
-import google.cloud.dataflow as df
-import google.cloud.dataflow.transforms.window as window
-
-
-def run(argv=None):
- """Build and run the pipeline."""
-
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--input_topic', required=True,
- help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
- parser.add_argument(
- '--output_topic', required=True,
- help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
-
- # Read the text file[pattern] into a PCollection.
- lines = p | df.io.Read(
- 'read', df.io.PubSubSource(known_args.input_topic))
-
- # Capitalize the characters in each line.
- transformed = (lines
- | (df.FlatMap('split',
- lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
- | df.Map('pair_with_one', lambda x: (x, 1))
- | df.WindowInto(window.FixedWindows(15, 0))
- | df.GroupByKey('group')
- | df.Map('count', lambda (word, ones): (word, sum(ones)))
- | df.Map('format', lambda tup: '%s: %d' % tup))
-
- # Write to PubSub.
- # pylint: disable=expression-not-assigned
- transformed | df.io.Write(
- 'pubsub_write', df.io.PubSubSink(known_args.output_topic))
-
- p.run()
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/wordcount.py b/sdks/python/google/cloud/dataflow/examples/wordcount.py
deleted file mode 100644
index cf87268..0000000
--- a/sdks/python/google/cloud/dataflow/examples/wordcount.py
+++ /dev/null
@@ -1,99 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A word-counting workflow."""
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-import re
-
-import google.cloud.dataflow as df
-
-
-empty_line_aggregator = df.Aggregator('emptyLines')
-average_word_size_aggregator = df.Aggregator('averageWordLength',
- df.combiners.MeanCombineFn(),
- float)
-
-
-class WordExtractingDoFn(df.DoFn):
- """Parse each line of input text into words."""
-
- def process(self, context):
- """Returns an iterator over the words of this element.
-
- The element is a line of text. If the line is blank, note that, too.
-
- Args:
- context: the call-specific context: data and aggregator.
-
- Returns:
- The processed element.
- """
- text_line = context.element.strip()
- if not text_line:
- context.aggregate_to(empty_line_aggregator, 1)
- words = re.findall(r'[A-Za-z\']+', text_line)
- for w in words:
- context.aggregate_to(average_word_size_aggregator, len(w))
- return words
-
-
-def run(argv=None):
- """Main entry point; defines and runs the wordcount pipeline."""
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--input',
- dest='input',
- default='gs://dataflow-samples/shakespeare/kinglear.txt',
- help='Input file to process.')
- parser.add_argument('--output',
- dest='output',
- required=True,
- help='Output file to write results to.')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
-
- # Read the text file[pattern] into a PCollection.
- lines = p | df.io.Read('read', df.io.TextFileSource(known_args.input))
-
- # Count the occurrences of each word.
- counts = (lines
- | (df.ParDo('split', WordExtractingDoFn())
- .with_output_types(unicode))
- | df.Map('pair_with_one', lambda x: (x, 1))
- | df.GroupByKey('group')
- | df.Map('count', lambda (word, ones): (word, sum(ones))))
-
- # Format the counts into a PCollection of strings.
- output = counts | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
-
- # Write the output using a "Write" transform that has side effects.
- # pylint: disable=expression-not-assigned
- output | df.io.Write('write', df.io.TextFileSink(known_args.output))
-
- # Actually run the pipeline (all operations above are deferred).
- result = p.run()
- empty_line_values = result.aggregated_values(empty_line_aggregator)
- logging.info('number of empty lines: %d', sum(empty_line_values.values()))
- word_length_values = result.aggregated_values(average_word_size_aggregator)
- logging.info('average word lengths: %s', word_length_values.values())
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/wordcount_debugging.py b/sdks/python/google/cloud/dataflow/examples/wordcount_debugging.py
deleted file mode 100644
index 66d4eb1..0000000
--- a/sdks/python/google/cloud/dataflow/examples/wordcount_debugging.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""An example that verifies the counts and includes Dataflow best practices.
-
-On top of the basic concepts in the wordcount example, this workflow introduces
-logging to Cloud Logging, and using assertions in a Dataflow pipeline.
-
-To execute this pipeline locally, specify a local output file or output prefix
-on GCS::
-
- --output [YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
-
-To execute this pipeline using the Google Cloud Dataflow service, specify
-pipeline configuration::
-
- --project YOUR_PROJECT_ID
- --staging_location gs://YOUR_STAGING_DIRECTORY
- --temp_location gs://YOUR_TEMP_DIRECTORY
- --job_name YOUR_JOB_NAME
- --runner BlockingDataflowPipelineRunner
-
-and an output prefix on GCS::
-
- --output gs://YOUR_OUTPUT_PREFIX
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-import re
-
-import google.cloud.dataflow as df
-
-
-class FilterTextFn(df.DoFn):
- """A DoFn that filters for a specific key based on a regular expression."""
-
- # A custom aggregator can track values in your pipeline as it runs. Those
- # values will be displayed in the Dataflow Monitoring UI when this pipeline is
- # run using the Dataflow service. These aggregators below track the number of
- # matched and unmatched words. Learn more at
- # https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
- # the Dataflow Monitoring UI.
- matched_words = df.Aggregator('matched_words')
- umatched_words = df.Aggregator('umatched_words')
-
- def __init__(self, pattern):
- super(FilterTextFn, self).__init__()
- self.pattern = pattern
-
- def process(self, context):
- word, _ = context.element
- if re.match(self.pattern, word):
- # Log at INFO level each element we match. When executing this pipeline
- # using the Dataflow service, these log lines will appear in the Cloud
- # Logging UI.
- logging.info('Matched %s', word)
- context.aggregate_to(self.matched_words, 1)
- yield context.element
- else:
- # Log at the "DEBUG" level each element that is not matched. Different log
- # levels can be used to control the verbosity of logging providing an
- # effective mechanism to filter less important information.
- # Note currently only "INFO" and higher level logs are emitted to the
- # Cloud Logger. This log message will not be visible in the Cloud Logger.
- logging.debug('Did not match %s', word)
- context.aggregate_to(self.umatched_words, 1)
-
-
-class CountWords(df.PTransform):
- """A transform to count the occurrences of each word.
-
- A PTransform that converts a PCollection containing lines of text into a
- PCollection of (word, count) tuples.
- """
-
- def __init__(self):
- super(CountWords, self).__init__()
-
- def apply(self, pcoll):
- return (pcoll
- | (df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
- | df.Map('pair_with_one', lambda x: (x, 1))
- | df.GroupByKey('group')
- | df.Map('count', lambda (word, ones): (word, sum(ones))))
-
-
-def run(argv=None):
- """Runs the debugging wordcount pipeline."""
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--input',
- dest='input',
- default='gs://dataflow-samples/shakespeare/kinglear.txt',
- help='Input file to process.')
- parser.add_argument('--output',
- dest='output',
- required=True,
- help='Output file to write results to.')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
-
- # Read the text file[pattern] into a PCollection, count the occurrences of
- # each word and filter by a list of words.
- filtered_words = (
- p | df.io.Read('read', df.io.TextFileSource(known_args.input))
- | CountWords() | df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
-
- # assert_that is a convenient PTransform that checks a PCollection has an
- # expected value. Asserts are best used in unit tests with small data sets but
- # is demonstrated here as a teaching tool.
- #
- # Note assert_that does not provide any output and that successful completion
- # of the Pipeline implies that the expectations were met. Learn more at
- # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
- # test your pipeline.
- df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))
-
- # Format the counts into a PCollection of strings and write the output using a
- # "Write" transform that has side effects.
- # pylint: disable=unused-variable
- output = (filtered_words
- | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
- | df.io.Write('write', df.io.TextFileSink(known_args.output)))
-
- # Actually run the pipeline (all operations above are deferred).
- p.run()
-
-
-if __name__ == '__main__':
- # Cloud Logging would contain only logging.INFO and higher level logs logged
- # by the root logger. All log statements emitted by the root logger will be
- # visible in the Cloud Logging UI. Learn more at
- # https://cloud.google.com/logging about the Cloud Logging UI.
- #
- # You can set the default logging level to a different level when running
- # locally.
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/wordcount_debugging_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/wordcount_debugging_test.py b/sdks/python/google/cloud/dataflow/examples/wordcount_debugging_test.py
deleted file mode 100644
index aa517d6..0000000
--- a/sdks/python/google/cloud/dataflow/examples/wordcount_debugging_test.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Test for the debugging wordcount example."""
-
-import logging
-import re
-import tempfile
-import unittest
-
-from google.cloud.dataflow.examples import wordcount_debugging
-
-
-class WordCountTest(unittest.TestCase):
-
- SAMPLE_TEXT = 'xx yy Flourish\n zz Flourish Flourish stomach\n aa\n bb cc dd'
-
- def create_temp_file(self, contents):
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(contents)
- return f.name
-
- def get_results(self, temp_path):
- results = []
- with open(temp_path + '.result-00000-of-00001') as result_file:
- for line in result_file:
- match = re.search(r'([A-Za-z]+): ([0-9]+)', line)
- if match is not None:
- results.append((match.group(1), int(match.group(2))))
- return results
-
- def test_basics(self):
- temp_path = self.create_temp_file(self.SAMPLE_TEXT)
- expected_words = [('Flourish', 3), ('stomach', 1)]
- wordcount_debugging.run([
- '--input=%s*' % temp_path,
- '--output=%s.result' % temp_path])
-
- # Parse result file and compare.
- results = self.get_results(temp_path)
- self.assertEqual(sorted(results), sorted(expected_words))
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/wordcount_minimal.py b/sdks/python/google/cloud/dataflow/examples/wordcount_minimal.py
deleted file mode 100644
index 69f3986..0000000
--- a/sdks/python/google/cloud/dataflow/examples/wordcount_minimal.py
+++ /dev/null
@@ -1,111 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A minimalist word-counting workflow that counts words in Shakespeare.
-
-This is the first in a series of successively more detailed 'word count'
-examples.
-
-Next, see the wordcount pipeline, then the wordcount_debugging pipeline, for
-more detailed examples that introduce additional concepts.
-
-Concepts:
-
-1. Reading data from text files
-2. Specifying 'inline' transforms
-3. Counting a PCollection
-4. Writing data to Cloud Storage as text files
-
-To execute this pipeline locally, first edit the code to specify the output
-location. Output location could be a local file path or an output prefix
-on GCS. (Only update the output location marked with the first CHANGE comment.)
-
-To execute this pipeline remotely, first edit the code to set your project ID,
-runner type, the staging location, the temp location, and the output location.
-The specified GCS bucket(s) must already exist. (Update all the places marked
-with a CHANGE comment.)
-
-Then, run the pipeline as described in the README. It will be deployed and run
-using the Google Cloud Dataflow Service. No args are required to run the
-pipeline. You can see the results in your output bucket in the GCS browser.
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-import re
-
-import google.cloud.dataflow as df
-
-
-def run(argv=None):
- """Main entry point; defines and runs the wordcount pipeline."""
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--input',
- dest='input',
- default='gs://dataflow-samples/shakespeare/kinglear.txt',
- help='Input file to process.')
- parser.add_argument('--output',
- dest='output',
- # CHANGE 1/5: The Google Cloud Storage path is required
- # for outputting the results.
- default='gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX',
- help='Output file to write results to.')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- pipeline_args.extend([
- # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowPipelineRunner to
- # run your pipeline on the Google Cloud Dataflow Service.
- '--runner=DirectPipelineRunner',
- # CHANGE 3/5: Your project ID is required in order to run your pipeline on
- # the Google Cloud Dataflow Service.
- '--project=SET_YOUR_PROJECT_ID_HERE',
- # CHANGE 4/5: Your Google Cloud Storage path is required for staging local
- # files.
- '--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY',
- # CHANGE 5/5: Your Google Cloud Storage path is required for temporary
- # files.
- '--temp_location=gs://YOUR_BUCKET_NAME/AND_TEMP_DIRECTORY',
- '--job_name=your-wordcount-job',
- ])
-
- p = df.Pipeline(argv=pipeline_args)
-
- # Read the text file[pattern] into a PCollection.
- lines = p | df.io.Read('read', df.io.TextFileSource(known_args.input))
-
- # Count the occurrences of each word.
- counts = (lines
- | (df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
- | df.Map('pair_with_one', lambda x: (x, 1))
- | df.GroupByKey('group')
- | df.Map('count', lambda (word, ones): (word, sum(ones))))
-
- # Format the counts into a PCollection of strings.
- output = counts | df.Map('format', lambda (word, c): '%s: %s' % (word, c))
-
- # Write the output using a "Write" transform that has side effects.
- # pylint: disable=expression-not-assigned
- output | df.io.Write('write', df.io.TextFileSink(known_args.output))
-
- # Actually run the pipeline (all operations above are deferred).
- p.run()
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/wordcount_minimal_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/wordcount_minimal_test.py b/sdks/python/google/cloud/dataflow/examples/wordcount_minimal_test.py
deleted file mode 100644
index f110c21..0000000
--- a/sdks/python/google/cloud/dataflow/examples/wordcount_minimal_test.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Test for the minimal wordcount example."""
-
-import collections
-import logging
-import re
-import tempfile
-import unittest
-
-from google.cloud.dataflow.examples import wordcount_minimal
-
-
-class WordCountMinimalTest(unittest.TestCase):
- """Unit test for wordcount_minimal example with direct runner."""
-
- SAMPLE_TEXT = 'a b c a b a\n aa bb cc aa bb aa'
-
- def create_temp_file(self, contents):
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(contents)
- return f.name
-
- def test_basics(self):
- temp_path = self.create_temp_file(self.SAMPLE_TEXT)
- expected_words = collections.defaultdict(int)
- for word in re.findall(r'\w+', self.SAMPLE_TEXT):
- expected_words[word] += 1
- wordcount_minimal.run([
- '--input=%s*' % temp_path,
- '--output=%s.result' % temp_path])
- # Parse result file and compare.
- results = []
- with open(temp_path + '.result-00000-of-00001') as result_file:
- for line in result_file:
- match = re.search(r'([a-z]+): ([0-9]+)', line)
- if match is not None:
- results.append((match.group(1), int(match.group(2))))
- self.assertEqual(sorted(results), sorted(expected_words.iteritems()))
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/wordcount_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/wordcount_test.py b/sdks/python/google/cloud/dataflow/examples/wordcount_test.py
deleted file mode 100644
index 72b1e32..0000000
--- a/sdks/python/google/cloud/dataflow/examples/wordcount_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Test for the wordcount example."""
-
-import collections
-import logging
-import re
-import tempfile
-import unittest
-
-from google.cloud.dataflow.examples import wordcount
-
-
-class WordCountTest(unittest.TestCase):
-
- SAMPLE_TEXT = 'a b c a b a\n\n aa bb cc aa bb aa'
-
- def create_temp_file(self, contents):
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(contents)
- return f.name
-
- def test_basics(self):
- temp_path = self.create_temp_file(self.SAMPLE_TEXT)
- expected_words = collections.defaultdict(int)
- for word in re.findall(r'\w+', self.SAMPLE_TEXT):
- expected_words[word] += 1
- wordcount.run([
- '--input=%s*' % temp_path,
- '--output=%s.result' % temp_path])
- # Parse result file and compare.
- results = []
- with open(temp_path + '.result-00000-of-00001') as result_file:
- for line in result_file:
- match = re.search(r'([a-z]+): ([0-9]+)', line)
- if match is not None:
- results.append((match.group(1), int(match.group(2))))
- self.assertEqual(sorted(results), sorted(expected_words.iteritems()))
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/__init__.py b/sdks/python/google/cloud/dataflow/internal/__init__.py
deleted file mode 100644
index e69de29..0000000