You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:43:07 UTC

[3/4] incubator-beam git commit: Refactor examples to use save_main_session

Refactor examples to use save_main_session


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d64a8c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d64a8c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d64a8c5

Branch: refs/heads/python-sdk
Commit: 3d64a8c5abf3dc9547ab75765cbd3e1ef5fac268
Parents: 2e36602
Author: Silviu Calinoiu <si...@google.com>
Authored: Sat Jul 23 09:39:59 2016 -0700
Committer: Silviu Calinoiu <si...@google.com>
Committed: Sat Jul 23 09:39:59 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/complete/autocomplete.py |  9 +++++++--
 sdks/python/apache_beam/examples/complete/estimate_pi.py  |  8 +++++++-
 sdks/python/apache_beam/examples/complete/tfidf.py        |  8 +++++++-
 .../examples/complete/top_wikipedia_sessions.py           | 10 ++++++++--
 .../apache_beam/examples/cookbook/bigquery_side_input.py  |  8 +++++++-
 sdks/python/apache_beam/examples/cookbook/bigshuffle.py   |  9 +++++++--
 sdks/python/apache_beam/examples/cookbook/coders.py       |  7 +++++++
 .../apache_beam/examples/cookbook/group_with_coder.py     |  9 +++++++--
 .../python/apache_beam/examples/cookbook/mergecontacts.py |  9 +++++++--
 .../examples/cookbook/multiple_output_pardo.py            |  9 +++++++--
 sdks/python/apache_beam/examples/wordcount.py             |  9 +++++++--
 sdks/python/apache_beam/examples/wordcount_debugging.py   |  9 +++++++--
 sdks/python/apache_beam/examples/wordcount_minimal.py     |  9 +++++++--
 13 files changed, 92 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 2fb2d92..0f1e96e 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -24,6 +24,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 def run(argv=None):
@@ -36,8 +38,11 @@ def run(argv=None):
                       required=True,
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
    | beam.io.Read('read', beam.io.TextFileSource(known_args.input))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 3c4a2d9..09faecf 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -36,6 +36,8 @@ import apache_beam as beam
 from apache_beam.typehints import Any
 from apache_beam.typehints import Iterable
 from apache_beam.typehints import Tuple
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 @beam.typehints.with_output_types(Tuple[int, int, int])
@@ -106,8 +108,12 @@ def run(argv=None):
                       required=True,
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
-  p = beam.Pipeline(argv=pipeline_args)
   (p  # pylint: disable=expression-not-assigned
    | EstimatePiTransform('Estimate')
    | beam.io.Write('Write',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index cd85651..ef58cc0 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -30,6 +30,8 @@ import re
 
 import apache_beam as beam
 from apache_beam.pvalue import AsSingleton
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 def read_documents(pipeline, uris):
@@ -183,8 +185,12 @@ def run(argv=None):
                       required=True,
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
-  p = beam.Pipeline(argv=pipeline_args)
   # Read documents specified by the uris command line option.
   pcoll = read_documents(p, glob.glob(known_args.uris))
   # Compute TF-IDF information for each word.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 7337910..c46bfc5 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -46,6 +46,9 @@ import logging
 import apache_beam as beam
 from apache_beam import combiners
 from apache_beam import window
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
+
 
 ONE_HOUR_IN_SECONDS = 3600
 THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS
@@ -158,8 +161,11 @@ def run(argv=None):
                       default=0.1,
                       help='Fraction of entries used for session tracking')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
    | beam.Read('read', beam.io.TextFileSource(known_args.input))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index c26f6cd..e1d9cf1 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -36,6 +36,8 @@ import apache_beam as beam
 from apache_beam.pvalue import AsIter
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 def create_groups(group_ids, corpus, word, ignore_corpus, ignore_word):
@@ -84,7 +86,11 @@ def run(argv=None):
   parser.add_argument('--num_groups')
 
   known_args, pipeline_args = parser.parse_known_args(argv)
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   group_ids = []
   for i in xrange(0, int(known_args.num_groups)):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index 0b5da02..cde00b3 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -25,6 +25,8 @@ import logging
 
 
 import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 def crc32line(line):
@@ -44,8 +46,11 @@ def run(argv=None):
   parser.add_argument('--checksum_output',
                       help='Checksum output file pattern.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
   lines = p | beam.io.Read(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index f5158c2..1ce1fa5 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -35,6 +35,8 @@ import json
 import logging
 
 import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 class JsonCoder(object):
@@ -77,6 +79,11 @@ def run(argv=None):
                       required=True,
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   p = beam.Pipeline(argv=pipeline_args)
   (p  # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 4da63d4..140314e 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -35,6 +35,8 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam.typehints import typehints
 from apache_beam.typehints.decorators import with_output_types
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 class Player(object):
@@ -85,8 +87,11 @@ def run(argv=sys.argv[1:]):
                       required=True,
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   # Register the custom coder for the Player class, so that it will be used in
   # the computation.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index bf8f4d0..9e6b001 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -36,6 +36,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 def run(argv=None, assert_results=None):
@@ -60,8 +62,11 @@ def run(argv=None, assert_results=None):
                       required=True,
                       help='Output file for statistics about the input.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   # Helper: read a tab-separated key-value mapping from a text file, escape all
   # quotes/backslashes, and convert it a PCollection of (key, value) pairs.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 48b8b1e..5bde591 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -54,6 +54,8 @@ import re
 
 import apache_beam as beam
 from apache_beam import pvalue
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 class SplitLinesToWordsFn(beam.DoFn):
@@ -129,8 +131,11 @@ def run(argv=None):
                       required=True,
                       help='Output prefix for files to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   lines = p | beam.Read('read', beam.io.TextFileSource(known_args.input))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index fc02e91..bbfd43e 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -24,6 +24,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 empty_line_aggregator = beam.Aggregator('emptyLines')
@@ -68,8 +70,11 @@ def run(argv=None):
                       required=True,
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
   lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index c989b6a..74effed 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -46,6 +46,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 class FilterTextFn(beam.DoFn):
@@ -115,8 +117,11 @@ def run(argv=None):
                       required=True,
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection, count the occurrences of
   # each word and filter by a list of words.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 1293686..c3c41d7 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -51,6 +51,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 def run(argv=None):
@@ -68,7 +70,6 @@ def run(argv=None):
                       default='gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX',
                       help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
   pipeline_args.extend([
       # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowPipelineRunner to
       # run your pipeline on the Google Cloud Dataflow Service.
@@ -85,7 +86,11 @@ def run(argv=None):
       '--job_name=your-wordcount-job',
   ])
 
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
   lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))