You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:47:17 UTC

[11/12] incubator-beam git commit: Final cleanup pass.

Final cleanup pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3c078fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3c078fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3c078fe

Branch: refs/heads/python-sdk
Commit: e3c078fe28553b7e7317316b6df51b4c570573ba
Parents: c5b5b14
Author: Robert Bradshaw <ro...@google.com>
Authored: Sat Jul 23 01:32:23 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete.py            |  4 ++--
 .../examples/complete/autocomplete_test.py       |  4 ++--
 .../apache_beam/examples/complete/estimate_pi.py |  5 ++---
 .../examples/complete/top_wikipedia_sessions.py  |  4 ++--
 .../complete/top_wikipedia_sessions_test.py      |  2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py  | 12 +++++-------
 .../examples/cookbook/custom_ptransform.py       |  9 +++++----
 .../examples/cookbook/custom_ptransform_test.py  |  2 +-
 .../apache_beam/examples/cookbook/filters.py     |  4 ++--
 .../examples/cookbook/group_with_coder.py        |  9 +++++----
 .../examples/cookbook/multiple_output_pardo.py   | 14 +++++++-------
 .../apache_beam/examples/snippets/snippets.py    | 19 +++++++++----------
 .../apache_beam/examples/streaming_wordcap.py    |  4 ++--
 13 files changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 10d9009..c3cd88f 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -48,8 +48,8 @@ def run(argv=None):
    | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
    | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
    | 'TopPerPrefix' >> TopPerPrefix(5)
-   | beam.Map('format',
-              lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+   | 'format' >> beam.Map(
+       lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
    | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 18d0511..0d20482 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
 
   def test_top_prefixes(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | 'create' >> beam.Create(self.WORDS)
-    result = words | 'test' >> autocomplete.TopPerPrefix(5)
+    words = p | beam.Create(self.WORDS)
+    result = words | autocomplete.TopPerPrefix(5)
     # values must be hashable for now
     result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
     assert_that(result, equal_to(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index c33db1d..37c1aad 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -112,9 +112,8 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | 'Estimate' >> EstimatePiTransform()
-   | beam.io.Write('Write',
-                   beam.io.TextFileSink(known_args.output,
+   | EstimatePiTransform()
+   | beam.io.Write(beam.io.TextFileSink(known_args.output,
                                         coder=JsonCoder())))
 
   # Actually run the pipeline (all operations above are deferred).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 7468484..a48a383 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -168,9 +168,9 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
+   | beam.Read(beam.io.TextFileSource(known_args.input))
    | ComputeTopSessions(known_args.sampling_threshold)
-   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
 
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 207d6c4..a84cc78 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
 
   def test_compute_top_sessions(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    edits = p | 'create' >> beam.Create(self.EDITS)
+    edits = p | beam.Create(self.EDITS)
     result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 
     beam.assert_that(result, beam.equal_to(self.EXPECTED))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index f7070dc..a076a0c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -54,8 +54,8 @@ def run(argv=None):
 
   # Read the text file[pattern] into a PCollection.
   lines = p | beam.io.Read(
-      'read', beam.io.TextFileSource(known_args.input,
-                                     coder=beam.coders.BytesCoder()))
+      beam.io.TextFileSource(known_args.input,
+                             coder=beam.coders.BytesCoder()))
 
   # Count the occurrences of each word.
   output = (lines
@@ -68,7 +68,7 @@ def run(argv=None):
                 lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
 
   # Write the output using a "Write" transform that has side effects.
-  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+  output | beam.io.Write(beam.io.TextFileSink(known_args.output))
 
   # Optionally write the input and output checksums.
   if known_args.checksum_output:
@@ -76,16 +76,14 @@ def run(argv=None):
                   | 'input-csum' >> beam.Map(crc32line)
                   | 'combine-input-csum' >> beam.CombineGlobally(sum)
                   | 'hex-format' >> beam.Map(lambda x: '%x' % x))
-    input_csum | beam.io.Write(
-        'write-input-csum',
+    input_csum | 'write-input-csum' >> beam.io.Write(
         beam.io.TextFileSink(known_args.checksum_output + '-input'))
 
     output_csum = (output
                    | 'output-csum' >> beam.Map(crc32line)
                    | 'combine-output-csum' >> beam.CombineGlobally(sum)
                    | 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
-    output_csum | beam.io.Write(
-        'write-output-csum',
+    output_csum | 'write-output-csum' >> beam.io.Write(
         beam.io.TextFileSink(known_args.checksum_output + '-output'))
 
   # Actually run the pipeline (all operations above are deferred).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 021eff6..ca13bbf 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -39,7 +39,7 @@ class Count1(beam.PTransform):
   def apply(self, pcoll):
     return (
         pcoll
-        | 'Init' >> beam.Map(lambda v: (v, 1))
+        | 'ParWithOne' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 
 
@@ -47,7 +47,8 @@ def run_count1(known_args, options):
   """Runs the first example pipeline."""
   logging.info('Running first pipeline')
   p = beam.Pipeline(options=options)
-  (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) | Count1()
+  (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+   | Count1()
    | beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
@@ -57,7 +58,7 @@ def Count2(pcoll):  # pylint: disable=invalid-name
   """Count as a decorated function."""
   return (
       pcoll
-      | 'Init' >> beam.Map(lambda v: (v, 1))
+      | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
       | beam.CombinePerKey(sum))
 
 
@@ -84,7 +85,7 @@ def Count3(pcoll, factor=1):  # pylint: disable=invalid-name
   """
   return (
       pcoll
-      | 'Init' >> beam.Map(lambda v: (v, factor))
+      | 'PairWithOne' >> beam.Map(lambda v: (v, factor))
       | beam.CombinePerKey(sum))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 603742f..806b031 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
 
   def run_pipeline(self, count_implementation, factor=1):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+    words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
     result = words | count_implementation
     assert_that(
         result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index efd0ba7..b3a969a 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -67,8 +67,8 @@ def filter_cold_days(input_data, month_filter):
   return (
       fields_of_interest
       | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
-      | beam.Filter('below mean',
-                    lambda row, mean: row['mean_temp'] < mean, global_mean))
+      | 'below mean' >> beam.Filter(
+          lambda row, mean: row['mean_temp'] < mean, global_mean))
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 6c86f61..651a4f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -98,19 +98,20 @@ def run(argv=sys.argv[1:]):
   coders.registry.register_coder(Player, PlayerCoder)
 
   (p  # pylint: disable=expression-not-assigned
-   | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+   | beam.io.Read(beam.io.TextFileSource(known_args.input))
    # The get_players function is annotated with a type hint above, so the type
    # system knows the output type of the following operation is a key-value pair
    # of a Player and an int. Please see the documentation for details on
    # types that are inferred automatically as well as other ways to specify
    # type hints.
-   | 'get players' >> beam.Map(get_players)
+   | beam.Map(get_players)
    # The output type hint of the previous step is used to infer that the key
    # type of the following operation is the Player type. Since a custom coder
    # is registered for the Player class above, a PlayerCoder will be used to
    # encode Player objects as keys for this combine operation.
-   | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
-   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | beam.CombinePerKey(sum)
+   | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
+   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 187d20b..d24170e 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -137,7 +137,7 @@ def run(argv=None):
   pipeline_options.view_as(SetupOptions).save_main_session = True
   p = beam.Pipeline(options=pipeline_options)
 
-  lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
+  lines = p | beam.Read(beam.io.TextFileSource(known_args.input))
 
   # with_outputs allows accessing the side outputs of a DoFn.
   split_lines_result = (lines
@@ -158,20 +158,20 @@ def run(argv=None):
    | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
    | beam.GroupByKey()
    | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
-   | beam.Write('write chars',
-                beam.io.TextFileSink(known_args.output + '-chars')))
+   | 'write chars' >> beam.Write(
+       beam.io.TextFileSink(known_args.output + '-chars')))
 
   # pylint: disable=expression-not-assigned
   (short_words
    | 'count short words' >> CountWords()
-   | beam.Write('write short words',
-                beam.io.TextFileSink(known_args.output + '-short-words')))
+   | 'write short words' >> beam.Write(
+       beam.io.TextFileSink(known_args.output + '-short-words')))
 
   # pylint: disable=expression-not-assigned
   (words
    | 'count words' >> CountWords()
-   | beam.Write('write words',
-                beam.io.TextFileSink(known_args.output + '-words')))
+   | 'write words' >> beam.Write(
+       beam.io.TextFileSink(known_args.output + '-words')))
 
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9f3d6e1..891f464 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -105,9 +105,8 @@ def construct_pipeline(renames):
 
   # [START pipelines_constructing_writing]
   filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
-  filtered_words | beam.io.Write('WriteMyFile',
-                                 beam.io.TextFileSink(
-                                     'gs://some/outputData.txt'))
+  filtered_words | 'WriteMyFile' >> beam.io.Write(
+      beam.io.TextFileSink('gs://some/outputData.txt'))
   # [END pipelines_constructing_writing]
 
   p.visit(SnippetUtils.RenameFiles(renames))
@@ -242,8 +241,8 @@ def pipeline_options_remote(argv):
   options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
   p = Pipeline(options=options)
 
-  lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
-  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
+  lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
+  lines | beam.io.Write(beam.io.TextFileSink(my_output))
 
   p.run()
 
@@ -283,8 +282,8 @@ def pipeline_options_local(argv):
   p = Pipeline(options=options)
   # [END pipeline_options_local]
 
-  lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
-  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
+  lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
+  lines | beam.io.Write(beam.io.TextFileSink(my_output))
   p.run()
 
 
@@ -344,8 +343,8 @@ def pipeline_logging(lines, output):
   p = beam.Pipeline(options=PipelineOptions())
   (p
    | beam.Create(lines)
-   | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
-   | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output)))
+   | beam.ParDo(ExtractWordsFn())
+   | beam.io.Write(beam.io.TextFileSink(output)))
 
   p.run()
 
@@ -1160,6 +1159,6 @@ class Count(beam.PTransform):
   def apply(self, pcoll):
     return (
         pcoll
-        | 'Init' >> beam.Map(lambda v: (v, 1))
+        | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 # [END model_library_transforms_count]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index ef95a5f..d25ec3e 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -45,7 +45,7 @@ def run(argv=None):
 
   # Read the text file[pattern] into a PCollection.
   lines = p | beam.io.Read(
-      'read', beam.io.PubSubSource(known_args.input_topic))
+      beam.io.PubSubSource(known_args.input_topic))
 
   # Capitalize the characters in each line.
   transformed = (lines
@@ -54,7 +54,7 @@ def run(argv=None):
   # Write to PubSub.
   # pylint: disable=expression-not-assigned
   transformed | beam.io.Write(
-      'pubsub_write', beam.io.PubSubSink(known_args.output_topic))
+      beam.io.PubSubSink(known_args.output_topic))
 
   p.run()