You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/10/12 22:54:53 UTC

[17/18] beam git commit: Change the examples with long-lambdas which were pulled out into defs to use explicit tuple unpacking instead of indexing. Note: this still keeps the lambdas which are short enough and those use indexing (as required).

Change the examples with long-lambdas which were pulled out into defs to use explicit tuple unpacking instead of indexing. Note: this still keeps the lambdas which are short enough and those use indexing (as required).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3b0ad58c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3b0ad58c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3b0ad58c

Branch: refs/heads/master
Commit: 3b0ad58cd68ee0790ac1f10ddffa96a866d85a0c
Parents: cf2be41
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 8 12:56:27 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete.py           |  3 +-
 .../examples/complete/game/leader_board.py      |  3 +-
 .../examples/complete/game/user_score.py        |  3 +-
 .../complete/juliaset/juliaset/juliaset.py      |  3 +-
 .../apache_beam/examples/complete/tfidf_test.py |  3 +-
 .../examples/cookbook/datastore_wordcount.py    |  8 ++++--
 .../examples/cookbook/mergecontacts.py          | 30 ++++++++++++--------
 .../examples/cookbook/multiple_output_pardo.py  | 10 ++++---
 .../apache_beam/examples/snippets/snippets.py   | 13 +++++----
 .../apache_beam/examples/streaming_wordcount.py |  3 +-
 .../apache_beam/examples/windowed_wordcount.py  |  3 +-
 sdks/python/apache_beam/examples/wordcount.py   |  8 ++++--
 .../apache_beam/examples/wordcount_debugging.py |  8 ++++--
 .../apache_beam/examples/wordcount_minimal.py   |  5 ++--
 sdks/python/apache_beam/transforms/util.py      |  8 +++---
 15 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index c09b78e..b556e65 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -46,7 +46,8 @@ def run(argv=None):
   pipeline_options.view_as(SetupOptions).save_main_session = True
   with beam.Pipeline(options=pipeline_options) as p:
     def format_result(prefix_candidates):
-      return '%s: %s' % (prefix_candidates[0], prefix_candidates[1])
+      (prefix, candidates) = prefix_candidates
+      return '%s: %s' % (prefix, candidates)
 
     (p  # pylint: disable=expression-not-assigned
      | 'read' >> ReadFromText(known_args.input)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index c1b15e9..a5bde05 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -331,7 +331,8 @@ def run(argv=None):
 
     # Get user scores and write the results to BigQuery
     def format_user_score_sums(user_score):
-      return {'user': user_score[0], 'total_score': user_score[1]}
+      (user, score) = user_score
+      return {'user': user, 'total_score': score}
 
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 0405bab..3e3e547 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -139,7 +139,8 @@ def run(argv=None):
 
   with beam.Pipeline(argv=pipeline_args) as p:
     def format_user_score_sums(user_score):
-      return 'user: %s, total_score: %s' % (user_score[0], user_score[1])
+      (user, score) = user_score
+      return 'user: %s, total_score: %s' % (user, score)
 
     (p  # pylint: disable=expression-not-assigned
      | 'ReadInputText' >> beam.io.ReadFromText(args.input)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 165237d..3f3ef03 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,7 +105,8 @@ def run(argv=None):  # pylint: disable=missing-docstring
     coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
 
     def x_coord_key(x_y_i):
-      return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
+      (x, y, i) = x_y_i
+      return (x, (x, y, i))
 
     # Group each coordinate triplet by its x value, then write the coordinates
     # to the output file with an x-coordinate grouping per line.

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 71e71e3..637d10a 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -52,7 +52,8 @@ class TfIdfTest(unittest.TestCase):
   def test_tfidf_transform(self):
     with TestPipeline() as p:
       def re_key(word_uri_tfidf):
-        return (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])
+        (word, (uri, tfidf)) = word_uri_tfidf
+        return (word, uri, tfidf)
 
       uri_to_line = p | 'create sample' >> beam.Create(
           [('1.txt', 'abc def ghi'),

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 03fcd8a..099fb08 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -180,7 +180,8 @@ def read_from_datastore(project, user_options, pipeline_options):
 
   # Count the occurrences of each word.
   def count_ones(word_ones):
-    return (word_ones[0], sum(word_ones[1]))
+    (word, ones) = word_ones
+    return (word, sum(ones))
 
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
@@ -190,8 +191,9 @@ def read_from_datastore(project, user_options, pipeline_options):
             | 'count' >> beam.Map(count_ones))
 
   # Format the counts into a PCollection of strings.
-  def format_result(w_c):
-    return '%s: %s' % (w_c[0], w_c[1])
+  def format_result(word_count):
+    (word, count) = word_count
+    return '%s: %s' % (word, count)
 
   output = counts | 'format' >> beam.Map(format_result)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index fdfa286..b07b98d 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -95,22 +95,28 @@ def run(argv=None, assert_results=None):
 
     # Prepare tab-delimited output; something like this:
     # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
-    tsv_lines = grouped | beam.Map(
-        lambda name_email_phone_snailmail: '\t'.join(
-            ['"%s"' % name_email_phone_snailmail[0],
-             '"%s"' % ','.join(name_email_phone_snailmail[1][0]),
-             '"%s"' % ','.join(name_email_phone_snailmail[1][1]),
-             '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
+    def format_as_tsv(name_email_phone_snailmail):
+      (name, (email, phone, snailmail)) = name_email_phone_snailmail
+      return '\t'.join(
+          ['"%s"' % name,
+           '"%s"' % ','.join(email),
+           '"%s"' % ','.join(phone),
+           '"%s"' % next(iter(snailmail), '')])
+
+    tsv_lines = grouped | beam.Map(format_as_tsv)
 
     # Compute some stats about our database of people.
-    def without_email(name_email_phone_snailmail1):
-      return not next(iter(name_email_phone_snailmail1[1][0]), None)
+    def without_email(name_email_phone_snailmail):
+      (_, (email, _, _)) = name_email_phone_snailmail
+      return not next(iter(email), None)
 
-    def without_phones(name_email_phone_snailmail2):
-      return not next(iter(name_email_phone_snailmail2[1][1]), None)
+    def without_phones(name_email_phone_snailmail):
+      (_, (_, phone, _)) = name_email_phone_snailmail
+      return not next(iter(phone), None)
 
-    def without_address(name_e_p_snailmail):
-      return not next(iter(name_e_p_snailmail[1][2]), None)
+    def without_address(name_email_phone_snailmail):
+      (_, (_, _, snailmail)) = name_email_phone_snailmail
+      return not next(iter(snailmail), None)
 
     luddites = grouped | beam.Filter(  # People without email.
         without_email)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index fe7929e..e3df3a8 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -120,10 +120,12 @@ class CountWords(beam.PTransform):
 
   def expand(self, pcoll):
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
-    def format_result(w_c):
-      return '%s: %s' % (w_c[0], w_c[1])
+    def format_result(word_count):
+      (word, count) = word_count
+      return '%s: %s' % (word, count)
 
     return (pcoll
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
@@ -169,7 +171,7 @@ def run(argv=None):
     (character_count
      | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
      | beam.GroupByKey()
-     | 'count chars' >> beam.Map(lambda __counts: sum(__counts[1]))
+     | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
      | 'write chars' >> WriteToText(known_args.output + '-chars'))
 
     # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 01118b3..54abd8c 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -535,8 +535,9 @@ def examples_wordcount_templated(renames):
   lines = p | 'Read' >> ReadFromText(wordcount_options.input)
   # [END example_wordcount_templated]
 
-  def format_result(w_c):
-    return '%s: %s' % (w_c[0], w_c[1])
+  def format_result(word_count):
+    (word, count) = word_count
+    return '%s: %s' % (word, count)
 
   (
       lines
@@ -614,8 +615,9 @@ def examples_wordcount_debugging(renames):
             [('Flourish', 3), ('stomach', 1)]))
     # [END example_wordcount_debugging_assert]
 
-    def format_result(w_c):
-      return '%s: %s' % (w_c[0], w_c[1])
+    def format_result(word_count):
+      (word, count) = word_count
+      return '%s: %s' % (word, count)
 
     output = (filtered_words
               | 'format' >> beam.Map(format_result)
@@ -1126,7 +1128,8 @@ def model_group_by_key(contents, output_path):
   import apache_beam as beam
   with TestPipeline() as p:  # Use TestPipeline for testing.
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
     words_and_counts = (
         p

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index d1af4ce..df8a99b 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -59,7 +59,8 @@ def run(argv=None):
 
     # Capitalize the characters in each line.
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
     transformed = (lines
                    # Use a pre-defined function that imports the re package.

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index f88b942..4c7eee1 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -70,7 +70,8 @@ def run(argv=None):
 
     # Capitalize the characters in each line.
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
     transformed = (lines
                    | 'Split' >> (beam.FlatMap(find_words)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 60f1ffe..b1c4a5e 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -89,7 +89,8 @@ def run(argv=None):
 
   # Count the occurrences of each word.
   def count_ones(word_ones):
-    return (word_ones[0], sum(word_ones[1]))
+    (word, ones) = word_ones
+    return (word, sum(ones))
 
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
@@ -99,8 +100,9 @@ def run(argv=None):
             | 'count' >> beam.Map(count_ones))
 
   # Format the counts into a PCollection of strings.
-  def format_result(w_c):
-    return '%s: %s' % (w_c[0], w_c[1])
+  def format_result(word_count):
+    (word, count) = word_count
+    return '%s: %s' % (word, count)
 
   output = counts | 'format' >> beam.Map(format_result)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 7d18c0d..6ff8f26 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -94,7 +94,8 @@ class CountWords(beam.PTransform):
   """
   def expand(self, pcoll):
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
     return (pcoll
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
@@ -144,8 +145,9 @@ def run(argv=None):
     # Format the counts into a PCollection of strings and write the output using
     # a "Write" transform that has side effects.
     # pylint: disable=unused-variable
-    def format_result(w_c):
-      return '%s: %s' % (w_c[0], w_c[1])
+    def format_result(word_count):
+      (word, count) = word_count
+      return '%s: %s' % (word, count)
 
     output = (filtered_words
               | 'format' >> beam.Map(format_result)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 54fd1f1..390c8c0 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,8 +106,9 @@ def run(argv=None):
         | 'GroupAndSum' >> beam.CombinePerKey(sum))
 
     # Format the counts into a PCollection of strings.
-    def format_result(w_c):
-      return '%s: %s' % (w_c[0], w_c[1])
+    def format_result(word_count):
+      (word, count) = word_count
+      return '%s: %s' % (word, count)
 
     output = counts | 'Format' >> beam.Map(format_result)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 6a7e269..647781f 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -98,16 +98,16 @@ class CoGroupByKey(PTransform):
   def expand(self, pcolls):
     """Performs CoGroupByKey on argument pcolls; see class docstring."""
     # For associating values in K-V pairs with the PCollections they came from.
-    def _pair_tag_with_value(k_v, tag):
-      (key, value) = k_v
+    def _pair_tag_with_value(key_value, tag):
+      (key, value) = key_value
       return (key, (tag, value))
 
     # Creates the key, value pairs for the output PCollection. Values are either
     # lists or dicts (per the class docstring), initialized by the result of
     # result_ctor(result_ctor_arg).
-    def _merge_tagged_vals_under_key(k_grouped, result_ctor,
+    def _merge_tagged_vals_under_key(key_grouped, result_ctor,
                                      result_ctor_arg):
-      (key, grouped) = k_grouped
+      (key, grouped) = key_grouped
       result_value = result_ctor(result_ctor_arg)
       for tag, value in grouped:
         result_value[tag].append(value)