You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:43:07 UTC
[3/4] incubator-beam git commit: Refactor examples to use
save_main_session
Refactor examples to use save_main_session
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d64a8c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d64a8c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d64a8c5
Branch: refs/heads/python-sdk
Commit: 3d64a8c5abf3dc9547ab75765cbd3e1ef5fac268
Parents: 2e36602
Author: Silviu Calinoiu <si...@google.com>
Authored: Sat Jul 23 09:39:59 2016 -0700
Committer: Silviu Calinoiu <si...@google.com>
Committed: Sat Jul 23 09:39:59 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/examples/complete/autocomplete.py | 9 +++++++--
sdks/python/apache_beam/examples/complete/estimate_pi.py | 8 +++++++-
sdks/python/apache_beam/examples/complete/tfidf.py | 8 +++++++-
.../examples/complete/top_wikipedia_sessions.py | 10 ++++++++--
.../apache_beam/examples/cookbook/bigquery_side_input.py | 8 +++++++-
sdks/python/apache_beam/examples/cookbook/bigshuffle.py | 9 +++++++--
sdks/python/apache_beam/examples/cookbook/coders.py | 7 +++++++
.../apache_beam/examples/cookbook/group_with_coder.py | 9 +++++++--
.../python/apache_beam/examples/cookbook/mergecontacts.py | 9 +++++++--
.../examples/cookbook/multiple_output_pardo.py | 9 +++++++--
sdks/python/apache_beam/examples/wordcount.py | 9 +++++++--
sdks/python/apache_beam/examples/wordcount_debugging.py | 9 +++++++--
sdks/python/apache_beam/examples/wordcount_minimal.py | 9 +++++++--
13 files changed, 92 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 2fb2d92..0f1e96e 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -24,6 +24,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
def run(argv=None):
@@ -36,8 +38,11 @@ def run(argv=None):
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
| beam.io.Read('read', beam.io.TextFileSource(known_args.input))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 3c4a2d9..09faecf 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -36,6 +36,8 @@ import apache_beam as beam
from apache_beam.typehints import Any
from apache_beam.typehints import Iterable
from apache_beam.typehints import Tuple
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
@beam.typehints.with_output_types(Tuple[int, int, int])
@@ -106,8 +108,12 @@ def run(argv=None):
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
- p = beam.Pipeline(argv=pipeline_args)
(p # pylint: disable=expression-not-assigned
| EstimatePiTransform('Estimate')
| beam.io.Write('Write',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index cd85651..ef58cc0 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -30,6 +30,8 @@ import re
import apache_beam as beam
from apache_beam.pvalue import AsSingleton
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
def read_documents(pipeline, uris):
@@ -183,8 +185,12 @@ def run(argv=None):
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
- p = beam.Pipeline(argv=pipeline_args)
# Read documents specified by the uris command line option.
pcoll = read_documents(p, glob.glob(known_args.uris))
# Compute TF-IDF information for each word.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 7337910..c46bfc5 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -46,6 +46,9 @@ import logging
import apache_beam as beam
from apache_beam import combiners
from apache_beam import window
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
+
ONE_HOUR_IN_SECONDS = 3600
THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS
@@ -158,8 +161,11 @@ def run(argv=None):
default=0.1,
help='Fraction of entries used for session tracking')
known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
| beam.Read('read', beam.io.TextFileSource(known_args.input))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index c26f6cd..e1d9cf1 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -36,6 +36,8 @@ import apache_beam as beam
from apache_beam.pvalue import AsIter
from apache_beam.pvalue import AsList
from apache_beam.pvalue import AsSingleton
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
def create_groups(group_ids, corpus, word, ignore_corpus, ignore_word):
@@ -84,7 +86,11 @@ def run(argv=None):
parser.add_argument('--num_groups')
known_args, pipeline_args = parser.parse_known_args(argv)
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
group_ids = []
for i in xrange(0, int(known_args.num_groups)):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index 0b5da02..cde00b3 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -25,6 +25,8 @@ import logging
import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
def crc32line(line):
@@ -44,8 +46,11 @@ def run(argv=None):
parser.add_argument('--checksum_output',
help='Checksum output file pattern.')
known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
lines = p | beam.io.Read(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index f5158c2..1ce1fa5 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -35,6 +35,8 @@ import json
import logging
import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
class JsonCoder(object):
@@ -77,6 +79,11 @@ def run(argv=None):
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
p = beam.Pipeline(argv=pipeline_args)
(p # pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 4da63d4..140314e 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -35,6 +35,8 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.typehints import typehints
from apache_beam.typehints.decorators import with_output_types
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
class Player(object):
@@ -85,8 +87,11 @@ def run(argv=sys.argv[1:]):
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
# Register the custom coder for the Player class, so that it will be used in
# the computation.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index bf8f4d0..9e6b001 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -36,6 +36,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
def run(argv=None, assert_results=None):
@@ -60,8 +62,11 @@ def run(argv=None, assert_results=None):
required=True,
help='Output file for statistics about the input.')
known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
# Helper: read a tab-separated key-value mapping from a text file, escape all
# quotes/backslashes, and convert it a PCollection of (key, value) pairs.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 48b8b1e..5bde591 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -54,6 +54,8 @@ import re
import apache_beam as beam
from apache_beam import pvalue
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
class SplitLinesToWordsFn(beam.DoFn):
@@ -129,8 +131,11 @@ def run(argv=None):
required=True,
help='Output prefix for files to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
lines = p | beam.Read('read', beam.io.TextFileSource(known_args.input))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index fc02e91..bbfd43e 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -24,6 +24,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
empty_line_aggregator = beam.Aggregator('emptyLines')
@@ -68,8 +70,11 @@ def run(argv=None):
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index c989b6a..74effed 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -46,6 +46,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
class FilterTextFn(beam.DoFn):
@@ -115,8 +117,11 @@ def run(argv=None):
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection, count the occurrences of
# each word and filter by a list of words.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 1293686..c3c41d7 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -51,6 +51,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
def run(argv=None):
@@ -68,7 +70,6 @@ def run(argv=None):
default='gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
-
pipeline_args.extend([
# CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowPipelineRunner to
# run your pipeline on the Google Cloud Dataflow Service.
@@ -85,7 +86,11 @@ def run(argv=None):
'--job_name=your-wordcount-job',
])
- p = beam.Pipeline(argv=pipeline_args)
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+ p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))