You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:47:09 UTC
[03/12] incubator-beam git commit: Move names out of transform
constructors.
Move names out of transform constructors.
sed -i -r 's/[|] (\S+)[(](["'"'"'][^"'"'"']+.)(, +|([)]))/| \2 >> \1(\4/g'
Small number of tests will need to be fixed by hand.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/031c4cce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/031c4cce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/031c4cce
Branch: refs/heads/python-sdk
Commit: 031c4cce0b9eae0d50a49f43ffeced1edbfd2f8f
Parents: 937cf69
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 14:34:58 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/dataflow_test.py | 96 ++--
.../examples/complete/autocomplete.py | 8 +-
.../examples/complete/autocomplete_test.py | 4 +-
.../examples/complete/estimate_pi.py | 8 +-
.../examples/complete/estimate_pi_test.py | 2 +-
.../complete/juliaset/juliaset/juliaset.py | 8 +-
.../apache_beam/examples/complete/tfidf.py | 32 +-
.../apache_beam/examples/complete/tfidf_test.py | 2 +-
.../examples/complete/top_wikipedia_sessions.py | 8 +-
.../complete/top_wikipedia_sessions_test.py | 2 +-
.../examples/cookbook/bigquery_schema.py | 4 +-
.../examples/cookbook/bigquery_side_input.py | 6 +-
.../cookbook/bigquery_side_input_test.py | 8 +-
.../examples/cookbook/bigquery_tornadoes.py | 6 +-
.../cookbook/bigquery_tornadoes_test.py | 2 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 18 +-
.../apache_beam/examples/cookbook/coders.py | 2 +-
.../examples/cookbook/coders_test.py | 4 +-
.../examples/cookbook/custom_ptransform.py | 6 +-
.../examples/cookbook/custom_ptransform_test.py | 2 +-
.../apache_beam/examples/cookbook/filters.py | 10 +-
.../examples/cookbook/filters_test.py | 2 +-
.../examples/cookbook/group_with_coder.py | 6 +-
.../examples/cookbook/mergecontacts.py | 8 +-
.../examples/cookbook/multiple_output_pardo.py | 18 +-
.../apache_beam/examples/snippets/snippets.py | 62 +--
.../examples/snippets/snippets_test.py | 10 +-
.../apache_beam/examples/streaming_wordcap.py | 2 +-
.../apache_beam/examples/streaming_wordcount.py | 8 +-
sdks/python/apache_beam/examples/wordcount.py | 14 +-
.../apache_beam/examples/wordcount_debugging.py | 16 +-
.../apache_beam/examples/wordcount_minimal.py | 14 +-
sdks/python/apache_beam/io/avroio.py | 2 +-
sdks/python/apache_beam/io/bigquery.py | 4 +-
.../apache_beam/io/filebasedsource_test.py | 4 +-
sdks/python/apache_beam/io/iobase.py | 4 +-
sdks/python/apache_beam/pipeline_test.py | 42 +-
sdks/python/apache_beam/pvalue_test.py | 6 +-
.../consumer_tracking_pipeline_visitor_test.py | 4 +-
sdks/python/apache_beam/runners/runner_test.py | 6 +-
.../apache_beam/transforms/combiners_test.py | 48 +-
sdks/python/apache_beam/transforms/core.py | 16 +-
.../python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 498 +++++++++----------
sdks/python/apache_beam/transforms/util.py | 8 +-
.../apache_beam/transforms/window_test.py | 14 +-
.../transforms/write_ptransform_test.py | 2 +-
.../typehints/typed_pipeline_test.py | 16 +-
48 files changed, 537 insertions(+), 537 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index 476f8b2..bf66851 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -54,33 +54,33 @@ class DataflowTest(unittest.TestCase):
def Count(pcoll): # pylint: disable=invalid-name, no-self-argument
"""A Count transform: v, ... => (v, n), ..."""
return (pcoll
- | Map('AddCount', lambda x: (x, 1))
- | GroupByKey('GroupCounts')
- | Map('AddCounts', lambda (x, ones): (x, sum(ones))))
+ | 'AddCount' >> Map(lambda x: (x, 1))
+ | 'GroupCounts' >> GroupByKey()
+ | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
def test_word_count(self):
pipeline = Pipeline('DirectPipelineRunner')
- lines = pipeline | Create('SomeWords', DataflowTest.SAMPLE_DATA)
+ lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
result = (
- (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+ (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
.apply('CountWords', DataflowTest.Count))
assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
pipeline.run()
def test_map(self):
pipeline = Pipeline('DirectPipelineRunner')
- lines = pipeline | Create('input', ['a', 'b', 'c'])
+ lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
result = (lines
- | Map('upper', str.upper)
- | Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
+ | 'upper' >> Map(str.upper)
+ | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
pipeline.run()
def test_par_do_with_side_input_as_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
- prefix = pipeline | Create('SomeString', ['xyz']) # side in
+ words = pipeline | 'SomeWords' >> Create(words_list)
+ prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in
suffix = 'zyx'
result = words | FlatMap(
'DecorateWords',
@@ -92,9 +92,9 @@ class DataflowTest(unittest.TestCase):
def test_par_do_with_side_input_as_keyword_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
+ words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
- suffix = pipeline | Create('SomeString', ['xyz']) # side in
+ suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
result = words | FlatMap(
'DecorateWords',
lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
@@ -111,10 +111,10 @@ class DataflowTest(unittest.TestCase):
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
+ words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
- suffix = pipeline | Create('SomeString', ['xyz']) # side in
- result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
+ suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
+ result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
suffix=AsSingleton(suffix))
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()
@@ -131,7 +131,7 @@ class DataflowTest(unittest.TestCase):
yield SideOutputValue('odd', context.element)
pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | ParDo(
'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
assert_that(results.main, equal_to([1, 2, 3, 4]))
@@ -147,7 +147,7 @@ class DataflowTest(unittest.TestCase):
return [v, SideOutputValue('odd', v)]
pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
assert_that(results.main, equal_to([1, 2, 3, 4]))
@@ -157,37 +157,37 @@ class DataflowTest(unittest.TestCase):
def test_empty_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', []) # Empty side input.
+ pcol = pipeline | 'start' >> Create([1, 2])
+ side = pipeline | 'side' >> Create([]) # Empty side input.
def my_fn(k, s):
v = ('empty' if isinstance(s, EmptySideInput) else 'full')
return [(k, v)]
- result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
+ result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side))
assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
pipeline.run()
def test_multi_valued_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', [3, 4]) # 2 values in side input.
- pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned
+ pcol = pipeline | 'start' >> Create([1, 2])
+ side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
+ pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned
with self.assertRaises(ValueError):
pipeline.run()
def test_default_value_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', []) # 0 values in side input.
+ pcol = pipeline | 'start' >> Create([1, 2])
+ side = pipeline | 'side' >> Create([]) # 0 values in side input.
result = (
- pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
+ pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)))
assert_that(result, equal_to([10, 20]))
pipeline.run()
def test_iterable_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', [3, 4]) # 2 values in side input.
+ pcol = pipeline | 'start' >> Create([1, 2])
+ side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
result = pcol | FlatMap('compute',
lambda x, s: [x * y for y in s], AllOf(side))
assert_that(result, equal_to([3, 4, 6, 8]))
@@ -195,7 +195,7 @@ class DataflowTest(unittest.TestCase):
def test_undeclared_side_outputs(self):
pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers',
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
@@ -210,7 +210,7 @@ class DataflowTest(unittest.TestCase):
def test_empty_side_outputs(self):
pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 3, 5])
+ nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
results = nums | FlatMap(
'ClassifyNumbers',
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
@@ -224,9 +224,9 @@ class DataflowTest(unittest.TestCase):
a_list = [5, 1, 3, 2, 9]
some_pairs = [('crouton', 17), ('supreme', None)]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
- side_pairs = pipeline | Create('side pairs', some_pairs)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
+ side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
results = main_input | FlatMap(
'concatenate',
lambda x, the_list, the_dict: [[x, the_list, the_dict]],
@@ -248,8 +248,8 @@ class DataflowTest(unittest.TestCase):
# with the same defaults will return the same PCollectionView.
a_list = [2]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, s1, s2: [[x, s1, s2]],
@@ -271,8 +271,8 @@ class DataflowTest(unittest.TestCase):
# distinct PCollectionViews with the same full_label.
a_list = [2]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
with self.assertRaises(RuntimeError) as e:
_ = main_input | FlatMap(
@@ -287,8 +287,8 @@ class DataflowTest(unittest.TestCase):
def test_as_singleton_with_different_defaults_with_unique_labels(self):
a_list = []
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, s1, s2: [[x, s1, s2]],
@@ -311,8 +311,8 @@ class DataflowTest(unittest.TestCase):
# return the same PCollectionView.
a_list = [1, 2, 3]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, ls1, ls2: [[x, ls1, ls2]],
@@ -332,8 +332,8 @@ class DataflowTest(unittest.TestCase):
def test_as_list_with_unique_labels(self):
a_list = [1, 2, 3]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, ls1, ls2: [[x, ls1, ls2]],
@@ -353,8 +353,8 @@ class DataflowTest(unittest.TestCase):
def test_as_dict_with_unique_labels(self):
some_kvs = [('a', 1), ('b', 2)]
pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_kvs = pipeline | Create('side kvs', some_kvs)
+ main_input = pipeline | 'main input' >> Create([1])
+ side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
results = main_input | FlatMap(
'test',
lambda x, dct1, dct2: [[x, dct1, dct2]],
@@ -383,10 +383,10 @@ class DataflowTest(unittest.TestCase):
return existing_windows
pipeline = Pipeline('DirectPipelineRunner')
- numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)])
+ numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
result = (numbers
- | WindowInto('W', windowfn=TestWindowFn())
- | GroupByKey('G'))
+ | 'W' >> WindowInto(windowfn=TestWindowFn())
+ | 'G' >> GroupByKey())
assert_that(
result, equal_to([(1, [10]), (1, [10]), (2, [20]),
(2, [20]), (3, [30]), (3, [30])]))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 0f1e96e..b68bc56 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -45,12 +45,12 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
- | beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
- | TopPerPrefix('TopPerPrefix', 5)
+ | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+ | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'TopPerPrefix' >> TopPerPrefix(5)
| beam.Map('format',
lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
- | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 84f947b..18d0511 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
def test_top_prefixes(self):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | beam.Create('create', self.WORDS)
- result = words | autocomplete.TopPerPrefix('test', 5)
+ words = p | 'create' >> beam.Create(self.WORDS)
+ result = words | 'test' >> autocomplete.TopPerPrefix(5)
# values must be hashable for now
result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
assert_that(result, equal_to(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 09faecf..ef9f8cc 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -96,9 +96,9 @@ class EstimatePiTransform(beam.PTransform):
def apply(self, pcoll):
# A hundred work items of a hundred thousand tries each.
return (pcoll
- | beam.Create('Initialize', [100000] * 100).with_output_types(int)
- | beam.Map('Run trials', run_trials)
- | beam.CombineGlobally('Sum', combine_results).without_defaults())
+ | 'Initialize' >> beam.Create([100000] * 100).with_output_types(int)
+ | 'Run trials' >> beam.Map(run_trials)
+ | 'Sum' >> beam.CombineGlobally(combine_results).without_defaults())
def run(argv=None):
@@ -115,7 +115,7 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | EstimatePiTransform('Estimate')
+ | 'Estimate' >> EstimatePiTransform()
| beam.io.Write('Write',
beam.io.TextFileSink(known_args.output,
coder=JsonCoder())))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index c633bb1..3967ed5 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -39,7 +39,7 @@ class EstimatePiTest(unittest.TestCase):
def test_basics(self):
p = beam.Pipeline('DirectPipelineRunner')
- result = p | estimate_pi.EstimatePiTransform('Estimate')
+ result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()
# Note: Probabilistically speaking this test can fail with a probability
# that is very small (VERY) given that we run at least 10 million trials.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 2bc37e6..56696c3 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -50,7 +50,7 @@ def generate_julia_set_colors(pipeline, c, n, max_iterations):
yield (x, y)
julia_set_colors = (pipeline
- | beam.Create('add points', point_set(n))
+ | 'add points' >> beam.Create(point_set(n))
| beam.Map(
get_julia_set_point_color, c, n, max_iterations))
@@ -105,11 +105,11 @@ def run(argv=None): # pylint: disable=missing-docstring
# Group each coordinate triplet by its x value, then write the coordinates to
# the output file with an x-coordinate grouping per line.
# pylint: disable=expression-not-assigned
- (coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
- | beam.GroupByKey('x coord') | beam.Map(
+ (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+ | 'x coord' >> beam.GroupByKey() | beam.Map(
'format',
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
- | beam.io.Write('write', beam.io.TextFileSink(known_args.coordinate_output)))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
# pylint: enable=expression-not-assigned
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index ef58cc0..043d5f6 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -42,7 +42,7 @@ def read_documents(pipeline, uris):
pipeline
| beam.io.Read('read: %s' % uri, beam.io.TextFileSource(uri))
| beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
- return pcolls | beam.Flatten('flatten read pcolls')
+ return pcolls | 'flatten read pcolls' >> beam.Flatten()
class TfIdf(beam.PTransform):
@@ -59,9 +59,9 @@ class TfIdf(beam.PTransform):
# PCollection to use as side input.
total_documents = (
uri_to_content
- | beam.Keys('get uris')
- | beam.RemoveDuplicates('get unique uris')
- | beam.combiners.Count.Globally(' count uris'))
+ | 'get uris' >> beam.Keys()
+ | 'get unique uris' >> beam.RemoveDuplicates()
+ | ' count uris' >> beam.combiners.Count.Globally())
# Create a collection of pairs mapping a URI to each of the words
# in the document associated with that that URI.
@@ -71,28 +71,28 @@ class TfIdf(beam.PTransform):
uri_to_words = (
uri_to_content
- | beam.FlatMap('split words', split_into_words))
+ | 'split words' >> beam.FlatMap(split_into_words))
# Compute a mapping from each word to the total number of documents
# in which it appears.
word_to_doc_count = (
uri_to_words
- | beam.RemoveDuplicates('get unique words per doc')
- | beam.Values('get words')
- | beam.combiners.Count.PerElement('count docs per word'))
+ | 'get unique words per doc' >> beam.RemoveDuplicates()
+ | 'get words' >> beam.Values()
+ | 'count docs per word' >> beam.combiners.Count.PerElement())
# Compute a mapping from each URI to the total number of words in the
# document associated with that URI.
uri_to_word_total = (
uri_to_words
- | beam.Keys(' get uris')
- | beam.combiners.Count.PerElement('count words in doc'))
+ | ' get uris' >> beam.Keys()
+ | 'count words in doc' >> beam.combiners.Count.PerElement())
# Count, for each (URI, word) pair, the number of occurrences of that word
# in the document associated with the URI.
uri_and_word_to_count = (
uri_to_words
- | beam.combiners.Count.PerElement('count word-doc pairs'))
+ | 'count word-doc pairs' >> beam.combiners.Count.PerElement())
# Adjust the above collection to a mapping from (URI, word) pairs to counts
# into an isomorphic mapping from URI to (word, count) pairs, to prepare
@@ -116,7 +116,7 @@ class TfIdf(beam.PTransform):
# ... ]}
uri_to_word_and_count_and_total = (
{'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
- | beam.CoGroupByKey('cogroup by uri'))
+ | 'cogroup by uri' >> beam.CoGroupByKey())
# Compute a mapping from each word to a (URI, term frequency) pair for each
# URI. A word's term frequency for a document is simply the number of times
@@ -132,7 +132,7 @@ class TfIdf(beam.PTransform):
word_to_uri_and_tf = (
uri_to_word_and_count_and_total
- | beam.FlatMap('compute term frequencies', compute_term_frequency))
+ | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency))
# Compute a mapping from each word to its document frequency.
# A word's document frequency in a corpus is the number of
@@ -155,7 +155,7 @@ class TfIdf(beam.PTransform):
# each keyed on the word.
word_to_uri_and_tf_and_df = (
{'tf': word_to_uri_and_tf, 'df': word_to_df}
- | beam.CoGroupByKey('cogroup words by tf-df'))
+ | 'cogroup words by tf-df' >> beam.CoGroupByKey())
# Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
# There are a variety of definitions of TF-IDF
@@ -170,7 +170,7 @@ class TfIdf(beam.PTransform):
word_to_uri_and_tfidf = (
word_to_uri_and_tf_and_df
- | beam.FlatMap('compute tf-idf', compute_tf_idf))
+ | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf))
return word_to_uri_and_tfidf
@@ -197,7 +197,7 @@ def run(argv=None):
output = pcoll | TfIdf()
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 8f52611..ee7e534 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase):
result = (
uri_to_line
| tfidf.TfIdf()
- | beam.Map('flatten', lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+ | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
# Run the pipeline. Note that the assert_that above adds to the pipeline
# a check that the result PCollection contains expected values. To actually
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index c46bfc5..7468484 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -134,9 +134,9 @@ class ComputeTopSessions(beam.PTransform):
| beam.Filter(lambda x: (abs(hash(x)) <=
MAX_TIMESTAMP * self.sampling_threshold))
| ComputeSessions()
- | beam.ParDo('SessionsToStrings', SessionsToStringsDoFn())
+ | 'SessionsToStrings' >> beam.ParDo(SessionsToStringsDoFn())
| TopPerMonth()
- | beam.ParDo('FormatOutput', FormatOutputDoFn()))
+ | 'FormatOutput' >> beam.ParDo(FormatOutputDoFn()))
def run(argv=None):
@@ -168,9 +168,9 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | beam.Read('read', beam.io.TextFileSource(known_args.input))
+ | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
| ComputeTopSessions(known_args.sampling_threshold)
- | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index fb48641..207d6c4 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
def test_compute_top_sessions(self):
p = beam.Pipeline('DirectPipelineRunner')
- edits = p | beam.Create('create', self.EDITS)
+ edits = p | 'create' >> beam.Create(self.EDITS)
result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
beam.assert_that(result, beam.equal_to(self.EXPECTED))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index 7c420fb..650a886 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -111,8 +111,8 @@ def run(argv=None):
}
# pylint: disable=expression-not-assigned
- record_ids = p | beam.Create('CreateIDs', ['1', '2', '3', '4', '5'])
- records = record_ids | beam.Map('CreateRecords', create_random_record)
+ record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
+ records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
records | beam.io.Write(
'write',
beam.io.BigQuerySink(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index e1d9cf1..1db4a1e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -105,9 +105,9 @@ def run(argv=None):
beam.io.BigQuerySource(query=query_corpus))
pcoll_word = p | beam.Read('read words',
beam.io.BigQuerySource(query=query_word))
- pcoll_ignore_corpus = p | beam.Create('create_ignore_corpus', [ignore_corpus])
- pcoll_ignore_word = p | beam.Create('create_ignore_word', [ignore_word])
- pcoll_group_ids = p | beam.Create('create groups', group_ids)
+ pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus])
+ pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
+ pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
pcoll_ignore_corpus, pcoll_ignore_word)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index bc75c41..215aafa 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -29,16 +29,16 @@ class BigQuerySideInputTest(unittest.TestCase):
def test_create_groups(self):
p = beam.Pipeline('DirectPipelineRunner')
- group_ids_pcoll = p | beam.Create('create_group_ids', ['A', 'B', 'C'])
+ group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
corpus_pcoll = p | beam.Create('create_corpus',
[{'f': 'corpus1'},
{'f': 'corpus2'},
{'f': 'corpus3'}])
- words_pcoll = p | beam.Create('create_words', [{'f': 'word1'},
+ words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
{'f': 'word2'},
{'f': 'word3'}])
- ignore_corpus_pcoll = p | beam.Create('create_ignore_corpus', ['corpus1'])
- ignore_word_pcoll = p | beam.Create('create_ignore_word', ['word1'])
+ ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
+ ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
words_pcoll, ignore_corpus_pcoll,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index e732309..cdaee36 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -56,8 +56,8 @@ def count_tornadoes(input_data):
| beam.FlatMap(
'months with tornadoes',
lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
- | beam.CombinePerKey('monthly count', sum)
- | beam.Map('format', lambda (k, v): {'month': k, 'tornado_count': v}))
+ | 'monthly count' >> beam.CombinePerKey(sum)
+ | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}))
def run(argv=None):
@@ -77,7 +77,7 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)
# Read the table rows into a PCollection.
- rows = p | beam.io.Read('read', beam.io.BigQuerySource(known_args.input))
+ rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
counts = count_tornadoes(rows)
# Write the output using a "Write" transform that has side effects.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 2547849..87e1f44 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -28,7 +28,7 @@ class BigQueryTornadoesTest(unittest.TestCase):
def test_basics(self):
p = beam.Pipeline('DirectPipelineRunner')
- rows = (p | beam.Create('create', [
+ rows = (p | 'create' >> beam.Create([
{'month': 1, 'day': 1, 'tornado': False},
{'month': 1, 'day': 2, 'tornado': True},
{'month': 1, 'day': 3, 'tornado': True},
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index cde00b3..c29a038 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -59,30 +59,30 @@ def run(argv=None):
# Count the occurrences of each word.
output = (lines
- | beam.Map('split', lambda x: (x[:10], x[10:99])
+ | 'split' >> beam.Map(lambda x: (x[:10], x[10:99])
).with_output_types(beam.typehints.KV[str, str])
- | beam.GroupByKey('group')
+ | 'group' >> beam.GroupByKey()
| beam.FlatMap(
'format',
lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
# Write the output using a "Write" transform that has side effects.
- output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
# Optionally write the input and output checksums.
if known_args.checksum_output:
input_csum = (lines
- | beam.Map('input-csum', crc32line)
- | beam.CombineGlobally('combine-input-csum', sum)
- | beam.Map('hex-format', lambda x: '%x' % x))
+ | 'input-csum' >> beam.Map(crc32line)
+ | 'combine-input-csum' >> beam.CombineGlobally(sum)
+ | 'hex-format' >> beam.Map(lambda x: '%x' % x))
input_csum | beam.io.Write(
'write-input-csum',
beam.io.TextFileSink(known_args.checksum_output + '-input'))
output_csum = (output
- | beam.Map('output-csum', crc32line)
- | beam.CombineGlobally('combine-output-csum', sum)
- | beam.Map('hex-format-output', lambda x: '%x' % x))
+ | 'output-csum' >> beam.Map(crc32line)
+ | 'combine-output-csum' >> beam.CombineGlobally(sum)
+ | 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
output_csum | beam.io.Write(
'write-output-csum',
beam.io.TextFileSink(known_args.checksum_output + '-output'))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index 1ce1fa5..bbe02b3 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -89,7 +89,7 @@ def run(argv=None):
(p # pylint: disable=expression-not-assigned
| beam.io.Read('read',
beam.io.TextFileSource(known_args.input, coder=JsonCoder()))
- | beam.FlatMap('points', compute_points)
+ | 'points' >> beam.FlatMap(compute_points)
| beam.CombinePerKey(sum)
| beam.io.Write('write',
beam.io.TextFileSink(known_args.output, coder=JsonCoder())))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 5840081..75b78c8 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -35,9 +35,9 @@ class CodersTest(unittest.TestCase):
def test_compute_points(self):
p = beam.Pipeline('DirectPipelineRunner')
- records = p | beam.Create('create', self.SAMPLE_RECORDS)
+ records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
result = (records
- | beam.FlatMap('points', coders.compute_points)
+ | 'points' >> beam.FlatMap(coders.compute_points)
| beam.CombinePerKey(sum))
assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)]))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index d3d8b08..021eff6 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -39,7 +39,7 @@ class Count1(beam.PTransform):
def apply(self, pcoll):
return (
pcoll
- | beam.Map('Init', lambda v: (v, 1))
+ | 'Init' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
@@ -57,7 +57,7 @@ def Count2(pcoll): # pylint: disable=invalid-name
"""Count as a decorated function."""
return (
pcoll
- | beam.Map('Init', lambda v: (v, 1))
+ | 'Init' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
@@ -84,7 +84,7 @@ def Count3(pcoll, factor=1): # pylint: disable=invalid-name
"""
return (
pcoll
- | beam.Map('Init', lambda v: (v, factor))
+ | 'Init' >> beam.Map(lambda v: (v, factor))
| beam.CombinePerKey(sum))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 3c0c6f3..603742f 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
def run_pipeline(self, count_implementation, factor=1):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | beam.Create('create', ['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+ words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
result = words | count_implementation
assert_that(
result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index c309941..b19b566 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -59,14 +59,14 @@ def filter_cold_days(input_data, month_filter):
# Compute the global mean temperature.
global_mean = AsSingleton(
fields_of_interest
- | beam.Map('extract mean', lambda row: row['mean_temp'])
- | beam.combiners.Mean.Globally('global mean'))
+ | 'extract mean' >> beam.Map(lambda row: row['mean_temp'])
+ | 'global mean' >> beam.combiners.Mean.Globally())
# Filter to the rows representing days in the month of interest
# in which the mean daily temperature is below the global mean.
return (
fields_of_interest
- | beam.Filter('desired month', lambda row: row['month'] == month_filter)
+ | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
| beam.Filter('below mean',
lambda row, mean: row['mean_temp'] < mean, global_mean))
@@ -88,11 +88,11 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)
- input_data = p | beam.Read('input', beam.io.BigQuerySource(known_args.input))
+ input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input))
# pylint: disable=expression-not-assigned
(filter_cold_days(input_data, known_args.month_filter)
- | beam.io.Write('save to BQ', beam.io.BigQuerySink(
+ | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output,
schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index cf1ca7e..9e5592f 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -36,7 +36,7 @@ class FiltersTest(unittest.TestCase):
def _get_result_for_month(self, month):
p = beam.Pipeline('DirectPipelineRunner')
- rows = (p | beam.Create('create', self.input_data))
+ rows = (p | 'create' >> beam.Create(self.input_data))
results = filters.filter_cold_days(rows, month)
return results
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 140314e..6c86f61 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -98,19 +98,19 @@ def run(argv=sys.argv[1:]):
coders.registry.register_coder(Player, PlayerCoder)
(p # pylint: disable=expression-not-assigned
- | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+ | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
# The get_players function is annotated with a type hint above, so the type
# system knows the output type of the following operation is a key-value pair
# of a Player and an int. Please see the documentation for details on
# types that are inferred automatically as well as other ways to specify
# type hints.
- | beam.Map('get players', get_players)
+ | 'get players' >> beam.Map(get_players)
# The output type hint of the previous step is used to infer that the key
# type of the following operation is the Player type. Since a custom coder
# is registered for the Player class above, a PlayerCoder will be used to
# encode Player objects as keys for this combine operation.
| beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
- | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 9e6b001..bf6d1b1 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -88,7 +88,7 @@ def run(argv=None, assert_results=None):
known_args.input_snailmail))
# Group together all entries under the same name.
- grouped = (email, phone, snailmail) | beam.CoGroupByKey('group_by_name')
+ grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
# Prepare tab-delimited output; something like this:
# "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
@@ -107,9 +107,9 @@ def run(argv=None, assert_results=None):
nomads = grouped | beam.Filter( # People without addresses.
lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
- num_luddites = luddites | beam.combiners.Count.Globally('luddites')
- num_writers = writers | beam.combiners.Count.Globally('writers')
- num_nomads = nomads | beam.combiners.Count.Globally('nomads')
+ num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally()
+ num_writers = writers | 'writers' >> beam.combiners.Count.Globally()
+ num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally()
# Write tab-delimited output.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 5bde591..187d20b 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -114,10 +114,10 @@ class CountWords(beam.PTransform):
def apply(self, pcoll):
return (pcoll
- | beam.Map('pair_with_one', lambda x: (x, 1))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones)))
- | beam.Map('format', lambda (word, c): '%s: %s' % (word, c)))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+ | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)))
def run(argv=None):
@@ -137,7 +137,7 @@ def run(argv=None):
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
- lines = p | beam.Read('read', beam.io.TextFileSource(known_args.input))
+ lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
# with_outputs allows accessing the side outputs of a DoFn.
split_lines_result = (lines
@@ -155,21 +155,21 @@ def run(argv=None):
# pylint: disable=expression-not-assigned
(character_count
- | beam.Map('pair_with_key', lambda x: ('chars_temp_key', x))
+ | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
- | beam.Map('count chars', lambda (_, counts): sum(counts))
+ | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
| beam.Write('write chars',
beam.io.TextFileSink(known_args.output + '-chars')))
# pylint: disable=expression-not-assigned
(short_words
- | CountWords('count short words')
+ | 'count short words' >> CountWords()
| beam.Write('write short words',
beam.io.TextFileSink(known_args.output + '-short-words')))
# pylint: disable=expression-not-assigned
(words
- | CountWords('count words')
+ | 'count words' >> CountWords()
| beam.Write('write words',
beam.io.TextFileSink(known_args.output + '-words')))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 3658619..c605db8 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -104,7 +104,7 @@ def construct_pipeline(renames):
# [END pipelines_constructing_applying]
# [START pipelines_constructing_writing]
- filtered_words = reversed_words | beam.Filter('FilterWords', filter_words)
+ filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
filtered_words | beam.io.Write('WriteMyFile',
beam.io.TextFileSink(
'gs://some/outputData.txt'))
@@ -242,8 +242,8 @@ def pipeline_options_remote(argv):
options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
p = Pipeline(options=options)
- lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input))
- lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output))
+ lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
+ lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
p.run()
@@ -283,8 +283,8 @@ def pipeline_options_local(argv):
p = Pipeline(options=options)
# [END pipeline_options_local]
- lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input))
- lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output))
+ lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
+ lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
p.run()
@@ -307,7 +307,7 @@ def pipeline_options_command_line(argv):
p = beam.Pipeline(argv=pipeline_args)
lines = p | beam.io.Read('ReadFromText',
beam.io.TextFileSource(known_args.input))
- lines | beam.io.Write('WriteToText', beam.io.TextFileSink(known_args.output))
+ lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
# [END pipeline_options_command_line]
p.run()
@@ -344,8 +344,8 @@ def pipeline_logging(lines, output):
p = beam.Pipeline(options=PipelineOptions())
(p
| beam.Create(lines)
- | beam.ParDo('ExtractWords', ExtractWordsFn())
- | beam.io.Write('WriteToText', beam.io.TextFileSink(output)))
+ | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
+ | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output)))
p.run()
@@ -391,11 +391,11 @@ def pipeline_monitoring(renames):
def apply(self, pcoll):
return (pcoll
# Convert lines of text into individual words.
- | beam.ParDo('ExtractWords', ExtractWordsFn())
+ | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
# Count the number of times each word occurs.
| beam.combiners.Count.PerElement()
# Format each word and count into a printable string.
- | beam.ParDo('FormatCounts', FormatCountsFn()))
+ | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))
# [END pipeline_monitoring_composite]
pipeline_options = PipelineOptions()
@@ -405,11 +405,11 @@ def pipeline_monitoring(renames):
# [START pipeline_monitoring_execution]
(p
# Read the lines of the input text.
- | beam.io.Read('ReadLines', beam.io.TextFileSource(options.input))
+ | 'ReadLines' >> beam.io.Read(beam.io.TextFileSource(options.input))
# Count the words.
| CountWords()
# Write the formatted word counts to output.
- | beam.io.Write('WriteCounts', beam.io.TextFileSink(options.output)))
+ | 'WriteCounts' >> beam.io.Write(beam.io.TextFileSink(options.output)))
# [END pipeline_monitoring_execution]
p.visit(SnippetUtils.RenameFiles(renames))
@@ -454,7 +454,7 @@ def examples_wordcount_minimal(renames):
# [END examples_wordcount_minimal_read]
# [START examples_wordcount_minimal_pardo]
- | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
# [END examples_wordcount_minimal_pardo]
# [START examples_wordcount_minimal_count]
@@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames):
# [END examples_wordcount_minimal_map]
# [START examples_wordcount_minimal_write]
- | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+ | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
# [END examples_wordcount_minimal_write]
)
@@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames):
formatted = counts | beam.ParDo(FormatAsTextFn())
# [END examples_wordcount_wordcount_dofn]
- formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+ formatted | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
p.visit(SnippetUtils.RenameFiles(renames))
p.run()
@@ -591,9 +591,9 @@ def examples_wordcount_debugging(renames):
p
| beam.io.Read(beam.io.TextFileSource(
'gs://dataflow-samples/shakespeare/kinglear.txt'))
- | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| beam.combiners.Count.PerElement()
- | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+ | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
# [START example_wordcount_debugging_assert]
beam.assert_that(
@@ -601,7 +601,7 @@ def examples_wordcount_debugging(renames):
# [END example_wordcount_debugging_assert]
output = (filtered_words
- | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+ | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
| beam.io.Write(
'write', beam.io.TextFileSink('gs://my-bucket/counts.txt')))
@@ -682,7 +682,7 @@ def model_custom_source(count):
# Using the source in an example pipeline.
# [START model_custom_source_use_new_source]
p = beam.Pipeline(options=PipelineOptions())
- numbers = p | beam.io.Read('ProduceNumbers', CountingSource(count))
+ numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
# [END model_custom_source_use_new_source]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
@@ -712,7 +712,7 @@ def model_custom_source(count):
# [START model_custom_source_use_ptransform]
p = beam.Pipeline(options=PipelineOptions())
- numbers = p | ReadFromCountingSource('ProduceNumbers', count)
+ numbers = p | 'ProduceNumbers' >> ReadFromCountingSource(count)
# [END model_custom_source_use_ptransform]
lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
@@ -848,7 +848,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
# [START model_custom_sink_use_ptransform]
p = beam.Pipeline(options=PipelineOptions())
- kvs = p | beam.core.Create('CreateKVs', KVs)
+ kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
kvs | WriteToKVSink('WriteToSimpleKV',
'http://url_to_simple_kv/', final_table_name)
# [END model_custom_sink_use_ptransform]
@@ -880,7 +880,7 @@ def model_textio(renames):
# [END model_textio_read]
# [START model_textio_write]
- filtered_words = lines | beam.FlatMap('FilterWords', filter_words)
+ filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
# [START model_pipelineio_write]
filtered_words | beam.io.Write(
'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers',
@@ -1053,7 +1053,7 @@ def model_group_by_key(contents, output_path):
p
| beam.Create(contents)
| beam.FlatMap(lambda x: re.findall(r'\w+', x))
- | beam.Map('one word', lambda w: (w, 1)))
+ | 'one word' >> beam.Map(lambda w: (w, 1)))
# GroupByKey accepts a PCollection of (w, 1) and
# outputs a PCollection of (w, (1, 1, ...)).
# (A key/value pair is just a tuple in Python.)
@@ -1063,7 +1063,7 @@ def model_group_by_key(contents, output_path):
grouped_words = words_and_counts | beam.GroupByKey()
# [END model_group_by_key_transform]
(grouped_words
- | beam.Map('count words', lambda (word, counts): (word, len(counts)))
+ | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
| beam.io.Write(beam.io.TextFileSink(output_path)))
p.run()
@@ -1083,8 +1083,8 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
# multiple possible values for each key.
# The phone_list contains values such as: ('mary': '111-222-3333') with
# multiple possible values for each key.
- emails = p | beam.Create('email', email_list)
- phones = p | beam.Create('phone', phone_list)
+ emails = p | 'email' >> beam.Create(email_list)
+ phones = p | 'phone' >> beam.Create(phone_list)
# The result PCollection contains one key-value element for each key in the
# input PCollections. The key of the pair will be the key from the input and
# the value will be a dictionary with two entries: 'emails' - an iterable of
@@ -1119,9 +1119,9 @@ def model_join_using_side_inputs(
# This code performs a join by receiving the set of names as an input and
# passing PCollections that contain emails and phone numbers as side inputs
# instead of using CoGroupByKey.
- names = p | beam.Create('names', name_list)
- emails = p | beam.Create('email', email_list)
- phones = p | beam.Create('phone', phone_list)
+ names = p | 'names' >> beam.Create(name_list)
+ emails = p | 'email' >> beam.Create(email_list)
+ phones = p | 'phone' >> beam.Create(phone_list)
def join_info(name, emails, phone_numbers):
filtered_emails = []
@@ -1149,7 +1149,7 @@ def model_join_using_side_inputs(
class Keys(beam.PTransform):
def apply(self, pcoll):
- return pcoll | beam.Map('Keys', lambda (k, v): k)
+ return pcoll | 'Keys' >> beam.Map(lambda (k, v): k)
# [END model_library_transforms_keys]
# pylint: enable=invalid-name
@@ -1160,6 +1160,6 @@ class Count(beam.PTransform):
def apply(self, pcoll):
return (
pcoll
- | beam.Map('Init', lambda v: (v, 1))
+ | 'Init' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
# [END model_library_transforms_count]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 7888263..9eba46a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -103,14 +103,14 @@ class ParDoTest(unittest.TestCase):
def test_pardo_with_label(self):
words = ['aa', 'bbc', 'defg']
# [START model_pardo_with_label]
- result = words | beam.Map('CountUniqueLetters', lambda word: len(set(word)))
+ result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
# [END model_pardo_with_label]
self.assertEqual({1, 2, 4}, set(result))
def test_pardo_side_input(self):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | beam.Create('start', ['a', 'bb', 'ccc', 'dddd'])
+ words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
# [START model_pardo_side_input]
# Callable takes additional arguments.
@@ -124,11 +124,11 @@ class ParDoTest(unittest.TestCase):
| beam.CombineGlobally(beam.combiners.MeanCombineFn()))
# Call with explicit side inputs.
- small_words = words | beam.FlatMap('small', filter_using_length, 0, 3)
+ small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
# A single deferred side input.
larger_than_average = (words
- | beam.FlatMap('large', filter_using_length,
+ | 'large' >> beam.FlatMap(filter_using_length,
lower_bound=pvalue.AsSingleton(
avg_word_len)))
@@ -268,7 +268,7 @@ class TypeHintsTest(unittest.TestCase):
evens = numbers | beam.ParDo(FilterEvensDoFn())
# [END type_hints_do_fn]
- words = p | beam.Create('words', ['a', 'bb', 'c'])
+ words = p | 'words' >> beam.Create(['a', 'bb', 'c'])
# One can assert outputs and apply them to transforms as well.
# Helps document the contract and checks it at pipeline construction time.
# [START type_hints_transform]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index 7148e58..ef95a5f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -49,7 +49,7 @@ def run(argv=None):
# Capitalize the characters in each line.
transformed = (lines
- | (beam.Map('capitalize', lambda x: x.upper())))
+ | 'capitalize' >> (beam.Map(lambda x: x.upper())))
# Write to PubSub.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index eda74dd..35c1abb 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -55,11 +55,11 @@ def run(argv=None):
| (beam.FlatMap('split',
lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
- | beam.Map('pair_with_one', lambda x: (x, 1))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones)))
- | beam.Map('format', lambda tup: '%s: %d' % tup))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+ | 'format' >> beam.Map(lambda tup: '%s: %d' % tup))
# Write to PubSub.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index bbfd43e..4744352 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -77,22 +77,22 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
- lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+ lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
# Count the occurrences of each word.
counts = (lines
- | (beam.ParDo('split', WordExtractingDoFn())
+ | 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
- | beam.Map('pair_with_one', lambda x: (x, 1))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
# Format the counts into a PCollection of strings.
- output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+ output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
# Actually run the pipeline (all operations above are deferred).
result = p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 74effed..e008b48 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -97,11 +97,11 @@ class CountWords(beam.PTransform):
def apply(self, pcoll):
return (pcoll
- | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
- | beam.Map('pair_with_one', lambda x: (x, 1))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
def run(argv=None):
@@ -126,9 +126,9 @@ def run(argv=None):
# Read the text file[pattern] into a PCollection, count the occurrences of
# each word and filter by a list of words.
filtered_words = (
- p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+ p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
| CountWords()
- | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+ | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
# assert_that is a convenient PTransform that checks a PCollection has an
# expected value. Asserts are best used in unit tests with small data sets but
@@ -145,8 +145,8 @@ def run(argv=None):
# "Write" transform that has side effects.
# pylint: disable=unused-variable
output = (filtered_words
- | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
- | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+ | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
# Actually run the pipeline (all operations above are deferred).
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index c3c41d7..ce5b644 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -93,22 +93,22 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
- lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+ lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
# Count the occurrences of each word.
counts = (lines
- | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
- | beam.Map('pair_with_one', lambda x: (x, 1))
- | beam.GroupByKey('group')
- | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
# Format the counts into a PCollection of strings.
- output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+ output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+ output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
# Actually run the pipeline (all operations above are deferred).
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 4d1b245..7ad3842 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -43,7 +43,7 @@ class ReadFromAvro(PTransform):
files, a ``PCollection`` for the records in these Avro files can be created
in the following manner.
p = df.Pipeline(argv=pipeline_args)
- records = p | df.io.ReadFromAvro('Read', '/mypath/myavrofiles*')
+ records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*')
Each record of this ``PCollection`` will contain a single record read from a
source. Records that are of simple types will be mapped into corresponding
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f2c56dc..f789312 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows
of the side table. The execution framework may use some caching techniques to
share the side inputs between calls in order to avoid excessive reading::
- main_table = pipeline | beam.io.Read(beam.io.BigQuerySource('very_big_table')
- side_table = pipeline | beam.io.Read(beam.io.BigQuerySource('not_big_table')
+ main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource()
+ side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource()
results = (
main_table
| beam.Map('process data',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index c7837ec..1bf51b2 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -255,7 +255,7 @@ class TestFileBasedSource(unittest.TestCase):
file_name, expected_data = _write_data(100)
assert len(expected_data) == 100
pipeline = beam.Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Read('Read', LineSource(file_name))
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(file_name))
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
@@ -263,7 +263,7 @@ class TestFileBasedSource(unittest.TestCase):
pattern, expected_data = _write_pattern([34, 66, 40, 24, 24, 12])
assert len(expected_data) == 200
pipeline = beam.Pipeline('DirectPipelineRunner')
- pcoll = pipeline | beam.Read('Read', LineSource(pattern))
+ pcoll = pipeline | 'Read' >> beam.Read(LineSource(pattern))
assert_that(pcoll, equal_to(expected_data))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index de5e9d4..b683eb2 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -990,7 +990,7 @@ class Write(ptransform.PTransform):
from apache_beam.io import iobase
if isinstance(self.sink, iobase.NativeSink):
# A native sink
- return pcoll | _NativeWrite('native_write', self.sink)
+ return pcoll | 'native_write' >> _NativeWrite(self.sink)
elif isinstance(self.sink, iobase.Sink):
# A custom sink
return pcoll | WriteImpl(self.sink)
@@ -1010,7 +1010,7 @@ class WriteImpl(ptransform.PTransform):
self.sink = sink
def apply(self, pcoll):
- do_once = pcoll.pipeline | core.Create('DoOnce', [None])
+ do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
init_result_coll = do_once | core.Map(
'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)
if getattr(self.sink, 'num_shards', 0):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 86ae45f..39816c0 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -69,7 +69,7 @@ class PipelineTest(unittest.TestCase):
@staticmethod
def custom_callable(pcoll):
- return pcoll | FlatMap('+1', lambda x: [x + 1])
+ return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
# Some of these tests designate a runner by name, others supply a runner.
# This variation is just to verify that both means of runner specification
@@ -78,7 +78,7 @@ class PipelineTest(unittest.TestCase):
class CustomTransform(PTransform):
def apply(self, pcoll):
- return pcoll | FlatMap('+1', lambda x: [x + 1])
+ return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
class Visitor(PipelineVisitor):
@@ -98,33 +98,33 @@ class PipelineTest(unittest.TestCase):
def test_create(self):
pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Create('label1', [1, 2, 3])
+ pcoll = pipeline | 'label1' >> Create([1, 2, 3])
assert_that(pcoll, equal_to([1, 2, 3]))
# Test if initial value is an iterator object.
- pcoll2 = pipeline | Create('label2', iter((4, 5, 6)))
- pcoll3 = pcoll2 | FlatMap('do', lambda x: [x + 10])
+ pcoll2 = pipeline | 'label2' >> Create(iter((4, 5, 6)))
+ pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')
pipeline.run()
def test_create_singleton_pcollection(self):
pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Create('label', [[1, 2, 3]])
+ pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
assert_that(pcoll, equal_to([[1, 2, 3]]))
pipeline.run()
def test_read(self):
pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Read('read', FakeSource([1, 2, 3]))
+ pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
assert_that(pcoll, equal_to([1, 2, 3]))
pipeline.run()
def test_visit_entire_graph(self):
pipeline = Pipeline(self.runner_name)
- pcoll1 = pipeline | Create('pcoll', [1, 2, 3])
- pcoll2 = pcoll1 | FlatMap('do1', lambda x: [x + 1])
- pcoll3 = pcoll2 | FlatMap('do2', lambda x: [x + 1])
- pcoll4 = pcoll2 | FlatMap('do3', lambda x: [x + 1])
+ pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
+ pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
+ pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
+ pcoll4 = pcoll2 | 'do3' >> FlatMap(lambda x: [x + 1])
transform = PipelineTest.CustomTransform()
pcoll5 = pcoll4 | transform
@@ -140,15 +140,15 @@ class PipelineTest(unittest.TestCase):
def test_apply_custom_transform(self):
pipeline = Pipeline(self.runner_name)
- pcoll = pipeline | Create('pcoll', [1, 2, 3])
+ pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
result = pcoll | PipelineTest.CustomTransform()
assert_that(result, equal_to([2, 3, 4]))
pipeline.run()
def test_reuse_custom_transform_instance(self):
pipeline = Pipeline(self.runner_name)
- pcoll1 = pipeline | Create('pcoll1', [1, 2, 3])
- pcoll2 = pipeline | Create('pcoll2', [4, 5, 6])
+ pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3])
+ pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6])
transform = PipelineTest.CustomTransform()
pcoll1 | transform
with self.assertRaises(RuntimeError) as cm:
@@ -183,7 +183,7 @@ class PipelineTest(unittest.TestCase):
self.assertEqual(
['a-x', 'b-x', 'c-x'],
- sorted(['a', 'b', 'c'] | AddSuffix('-x')))
+ sorted(['a', 'b', 'c'] | '-x' >> AddSuffix()))
def test_cached_pvalues_are_refcounted(self):
"""Test that cached PValues are refcounted and deleted.
@@ -213,17 +213,17 @@ class PipelineTest(unittest.TestCase):
gc.collect()
count_threshold = len(gc.get_objects()) + 10000
- biglist = pipeline | Create('oom:create', ['x'] * num_elements)
+ biglist = pipeline | 'oom:create' >> Create(['x'] * num_elements)
dupes = (
biglist
- | Map('oom:addone', lambda x: (x, 1))
- | FlatMap('oom:dupes', create_dupes,
+ | 'oom:addone' >> Map(lambda x: (x, 1))
+ | 'oom:dupes' >> FlatMap(create_dupes,
AsIter(biglist)).with_outputs('side', main='main'))
result = (
(dupes.side, dupes.main, dupes.side)
- | Flatten('oom:flatten')
- | CombinePerKey('oom:combine', sum)
- | Map('oom:check', check_memory, count_threshold))
+ | 'oom:flatten' >> Flatten()
+ | 'oom:combine' >> CombinePerKey(sum)
+ | 'oom:check' >> Map(check_memory, count_threshold))
assert_that(result, equal_to([('x', 3 * num_elements)]))
pipeline.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index bb742e0..323ca33 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -45,9 +45,9 @@ class PValueTest(unittest.TestCase):
def test_pcollectionview_not_recreated(self):
pipeline = Pipeline('DirectPipelineRunner')
- value = pipeline | Create('create1', [1, 2, 3])
- value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)])
- value3 = pipeline | Create('create3', [(1, 1), (2, 2), (3, 3)])
+ value = pipeline | 'create1' >> Create([1, 2, 3])
+ value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
+ value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])
self.assertEqual(AsSingleton(value), AsSingleton(value))
self.assertEqual(AsSingleton('new', value, default_value=1),
AsSingleton('new', value, default_value=1))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
index b3b2968..3cd8d73 100644
--- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
@@ -103,8 +103,8 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
pvalue.ListPCollectionView))
def test_co_group_by_key(self):
- emails = self.pipeline | Create('email', [('joe', 'joe@example.com')])
- phones = self.pipeline | Create('phone', [('mary', '111-222-3333')])
+ emails = self.pipeline | 'email' >> Create([('joe', 'joe@example.com')])
+ phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')])
{'emails': emails, 'phones': phones} | CoGroupByKey()
self.pipeline.visit(self.visitor)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 2863756..04de7fb 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -60,9 +60,9 @@ class RunnerTest(unittest.TestCase):
'--no_auth=True'
]))
- (p | ptransform.Create('create', [1, 2, 3]) # pylint: disable=expression-not-assigned
- | ptransform.FlatMap('do', lambda x: [(x, x)])
- | ptransform.GroupByKey('gbk'))
+ (p | 'create' >> ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned
+ | 'do' >> ptransform.FlatMap(lambda x: [(x, x)])
+ | 'gbk' >> ptransform.GroupByKey())
remote_runner.job = apiclient.Job(p.options)
super(DataflowPipelineRunner, remote_runner).run(p)