You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:47:17 UTC
[11/12] incubator-beam git commit: Final cleanup pass.
Final cleanup pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3c078fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3c078fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3c078fe
Branch: refs/heads/python-sdk
Commit: e3c078fe28553b7e7317316b6df51b4c570573ba
Parents: c5b5b14
Author: Robert Bradshaw <ro...@google.com>
Authored: Sat Jul 23 01:32:23 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700
----------------------------------------------------------------------
.../examples/complete/autocomplete.py | 4 ++--
.../examples/complete/autocomplete_test.py | 4 ++--
.../apache_beam/examples/complete/estimate_pi.py | 5 ++---
.../examples/complete/top_wikipedia_sessions.py | 4 ++--
.../complete/top_wikipedia_sessions_test.py | 2 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 12 +++++-------
.../examples/cookbook/custom_ptransform.py | 9 +++++----
.../examples/cookbook/custom_ptransform_test.py | 2 +-
.../apache_beam/examples/cookbook/filters.py | 4 ++--
.../examples/cookbook/group_with_coder.py | 9 +++++----
.../examples/cookbook/multiple_output_pardo.py | 14 +++++++-------
.../apache_beam/examples/snippets/snippets.py | 19 +++++++++----------
.../apache_beam/examples/streaming_wordcap.py | 4 ++--
13 files changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 10d9009..c3cd88f 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -48,8 +48,8 @@ def run(argv=None):
| 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'TopPerPrefix' >> TopPerPrefix(5)
- | beam.Map('format',
- lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+ | 'format' >> beam.Map(
+ lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 18d0511..0d20482 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
def test_top_prefixes(self):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | 'create' >> beam.Create(self.WORDS)
- result = words | 'test' >> autocomplete.TopPerPrefix(5)
+ words = p | beam.Create(self.WORDS)
+ result = words | autocomplete.TopPerPrefix(5)
# values must be hashable for now
result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
assert_that(result, equal_to(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index c33db1d..37c1aad 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -112,9 +112,8 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | 'Estimate' >> EstimatePiTransform()
- | beam.io.Write('Write',
- beam.io.TextFileSink(known_args.output,
+ | EstimatePiTransform()
+ | beam.io.Write(beam.io.TextFileSink(known_args.output,
coder=JsonCoder())))
# Actually run the pipeline (all operations above are deferred).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 7468484..a48a383 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -168,9 +168,9 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
+ | beam.Read(beam.io.TextFileSource(known_args.input))
| ComputeTopSessions(known_args.sampling_threshold)
- | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 207d6c4..a84cc78 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
def test_compute_top_sessions(self):
p = beam.Pipeline('DirectPipelineRunner')
- edits = p | 'create' >> beam.Create(self.EDITS)
+ edits = p | beam.Create(self.EDITS)
result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
beam.assert_that(result, beam.equal_to(self.EXPECTED))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index f7070dc..a076a0c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -54,8 +54,8 @@ def run(argv=None):
# Read the text file[pattern] into a PCollection.
lines = p | beam.io.Read(
- 'read', beam.io.TextFileSource(known_args.input,
- coder=beam.coders.BytesCoder()))
+ beam.io.TextFileSource(known_args.input,
+ coder=beam.coders.BytesCoder()))
# Count the occurrences of each word.
output = (lines
@@ -68,7 +68,7 @@ def run(argv=None):
lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
# Write the output using a "Write" transform that has side effects.
- output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+ output | beam.io.Write(beam.io.TextFileSink(known_args.output))
# Optionally write the input and output checksums.
if known_args.checksum_output:
@@ -76,16 +76,14 @@ def run(argv=None):
| 'input-csum' >> beam.Map(crc32line)
| 'combine-input-csum' >> beam.CombineGlobally(sum)
| 'hex-format' >> beam.Map(lambda x: '%x' % x))
- input_csum | beam.io.Write(
- 'write-input-csum',
+ input_csum | 'write-input-csum' >> beam.io.Write(
beam.io.TextFileSink(known_args.checksum_output + '-input'))
output_csum = (output
| 'output-csum' >> beam.Map(crc32line)
| 'combine-output-csum' >> beam.CombineGlobally(sum)
| 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
- output_csum | beam.io.Write(
- 'write-output-csum',
+ output_csum | 'write-output-csum' >> beam.io.Write(
beam.io.TextFileSink(known_args.checksum_output + '-output'))
# Actually run the pipeline (all operations above are deferred).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 021eff6..ca13bbf 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -39,7 +39,7 @@ class Count1(beam.PTransform):
def apply(self, pcoll):
return (
pcoll
- | 'Init' >> beam.Map(lambda v: (v, 1))
+ | 'ParWithOne' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
@@ -47,7 +47,8 @@ def run_count1(known_args, options):
"""Runs the first example pipeline."""
logging.info('Running first pipeline')
p = beam.Pipeline(options=options)
- (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) | Count1()
+ (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+ | Count1()
| beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
@@ -57,7 +58,7 @@ def Count2(pcoll): # pylint: disable=invalid-name
"""Count as a decorated function."""
return (
pcoll
- | 'Init' >> beam.Map(lambda v: (v, 1))
+ | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
@@ -84,7 +85,7 @@ def Count3(pcoll, factor=1): # pylint: disable=invalid-name
"""
return (
pcoll
- | 'Init' >> beam.Map(lambda v: (v, factor))
+ | 'PairWithOne' >> beam.Map(lambda v: (v, factor))
| beam.CombinePerKey(sum))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 603742f..806b031 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
def run_pipeline(self, count_implementation, factor=1):
p = beam.Pipeline('DirectPipelineRunner')
- words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+ words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
result = words | count_implementation
assert_that(
result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index efd0ba7..b3a969a 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -67,8 +67,8 @@ def filter_cold_days(input_data, month_filter):
return (
fields_of_interest
| 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
- | beam.Filter('below mean',
- lambda row, mean: row['mean_temp'] < mean, global_mean))
+ | 'below mean' >> beam.Filter(
+ lambda row, mean: row['mean_temp'] < mean, global_mean))
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 6c86f61..651a4f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -98,19 +98,20 @@ def run(argv=sys.argv[1:]):
coders.registry.register_coder(Player, PlayerCoder)
(p # pylint: disable=expression-not-assigned
- | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+ | beam.io.Read(beam.io.TextFileSource(known_args.input))
# The get_players function is annotated with a type hint above, so the type
# system knows the output type of the following operation is a key-value pair
# of a Player and an int. Please see the documentation for details on
# types that are inferred automatically as well as other ways to specify
# type hints.
- | 'get players' >> beam.Map(get_players)
+ | beam.Map(get_players)
# The output type hint of the previous step is used to infer that the key
# type of the following operation is the Player type. Since a custom coder
# is registered for the Player class above, a PlayerCoder will be used to
# encode Player objects as keys for this combine operation.
- | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
- | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | beam.CombinePerKey(sum)
+ | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
+ | beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 187d20b..d24170e 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -137,7 +137,7 @@ def run(argv=None):
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
- lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
+ lines = p | beam.Read(beam.io.TextFileSource(known_args.input))
# with_outputs allows accessing the side outputs of a DoFn.
split_lines_result = (lines
@@ -158,20 +158,20 @@ def run(argv=None):
| 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
| 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
- | beam.Write('write chars',
- beam.io.TextFileSink(known_args.output + '-chars')))
+ | 'write chars' >> beam.Write(
+ beam.io.TextFileSink(known_args.output + '-chars')))
# pylint: disable=expression-not-assigned
(short_words
| 'count short words' >> CountWords()
- | beam.Write('write short words',
- beam.io.TextFileSink(known_args.output + '-short-words')))
+ | 'write short words' >> beam.Write(
+ beam.io.TextFileSink(known_args.output + '-short-words')))
# pylint: disable=expression-not-assigned
(words
| 'count words' >> CountWords()
- | beam.Write('write words',
- beam.io.TextFileSink(known_args.output + '-words')))
+ | 'write words' >> beam.Write(
+ beam.io.TextFileSink(known_args.output + '-words')))
p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9f3d6e1..891f464 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -105,9 +105,8 @@ def construct_pipeline(renames):
# [START pipelines_constructing_writing]
filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
- filtered_words | beam.io.Write('WriteMyFile',
- beam.io.TextFileSink(
- 'gs://some/outputData.txt'))
+ filtered_words | 'WriteMyFile' >> beam.io.Write(
+ beam.io.TextFileSink('gs://some/outputData.txt'))
# [END pipelines_constructing_writing]
p.visit(SnippetUtils.RenameFiles(renames))
@@ -242,8 +241,8 @@ def pipeline_options_remote(argv):
options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
p = Pipeline(options=options)
- lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
- lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
+ lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
+ lines | beam.io.Write(beam.io.TextFileSink(my_output))
p.run()
@@ -283,8 +282,8 @@ def pipeline_options_local(argv):
p = Pipeline(options=options)
# [END pipeline_options_local]
- lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
- lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
+ lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
+ lines | beam.io.Write(beam.io.TextFileSink(my_output))
p.run()
@@ -344,8 +343,8 @@ def pipeline_logging(lines, output):
p = beam.Pipeline(options=PipelineOptions())
(p
| beam.Create(lines)
- | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
- | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output)))
+ | beam.ParDo(ExtractWordsFn())
+ | beam.io.Write(beam.io.TextFileSink(output)))
p.run()
@@ -1160,6 +1159,6 @@ class Count(beam.PTransform):
def apply(self, pcoll):
return (
pcoll
- | 'Init' >> beam.Map(lambda v: (v, 1))
+ | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
| beam.CombinePerKey(sum))
# [END model_library_transforms_count]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index ef95a5f..d25ec3e 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -45,7 +45,7 @@ def run(argv=None):
# Read the text file[pattern] into a PCollection.
lines = p | beam.io.Read(
- 'read', beam.io.PubSubSource(known_args.input_topic))
+ beam.io.PubSubSource(known_args.input_topic))
# Capitalize the characters in each line.
transformed = (lines
@@ -54,7 +54,7 @@ def run(argv=None):
# Write to PubSub.
# pylint: disable=expression-not-assigned
transformed | beam.io.Write(
- 'pubsub_write', beam.io.PubSubSink(known_args.output_topic))
+ beam.io.PubSubSink(known_args.output_topic))
p.run()