You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/10/12 22:54:53 UTC
[17/18] beam git commit: Change the examples with long-lambdas which
were pulled out into defs to use explicit tuple unpacking instead of
indexing. Note: this still keeps the lambdas which are short enough and those
use indexing (as required).
Change the examples with long-lambdas which were pulled out into defs to use explicit tuple unpacking instead of indexing. Note: this still keeps the lambdas which are short enough and those use indexing (as required).
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3b0ad58c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3b0ad58c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3b0ad58c
Branch: refs/heads/master
Commit: 3b0ad58cd68ee0790ac1f10ddffa96a866d85a0c
Parents: cf2be41
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 8 12:56:27 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700
----------------------------------------------------------------------
.../examples/complete/autocomplete.py | 3 +-
.../examples/complete/game/leader_board.py | 3 +-
.../examples/complete/game/user_score.py | 3 +-
.../complete/juliaset/juliaset/juliaset.py | 3 +-
.../apache_beam/examples/complete/tfidf_test.py | 3 +-
.../examples/cookbook/datastore_wordcount.py | 8 ++++--
.../examples/cookbook/mergecontacts.py | 30 ++++++++++++--------
.../examples/cookbook/multiple_output_pardo.py | 10 ++++---
.../apache_beam/examples/snippets/snippets.py | 13 +++++----
.../apache_beam/examples/streaming_wordcount.py | 3 +-
.../apache_beam/examples/windowed_wordcount.py | 3 +-
sdks/python/apache_beam/examples/wordcount.py | 8 ++++--
.../apache_beam/examples/wordcount_debugging.py | 8 ++++--
.../apache_beam/examples/wordcount_minimal.py | 5 ++--
sdks/python/apache_beam/transforms/util.py | 8 +++---
15 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index c09b78e..b556e65 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -46,7 +46,8 @@ def run(argv=None):
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
def format_result(prefix_candidates):
- return '%s: %s' % (prefix_candidates[0], prefix_candidates[1])
+ (prefix, candidates) = prefix_candidates
+ return '%s: %s' % (prefix, candidates)
(p # pylint: disable=expression-not-assigned
| 'read' >> ReadFromText(known_args.input)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index c1b15e9..a5bde05 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -331,7 +331,8 @@ def run(argv=None):
# Get user scores and write the results to BigQuery
def format_user_score_sums(user_score):
- return {'user': user_score[0], 'total_score': user_score[1]}
+ (user, score) = user_score
+ return {'user': user, 'total_score': score}
(events # pylint: disable=expression-not-assigned
| 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 0405bab..3e3e547 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -139,7 +139,8 @@ def run(argv=None):
with beam.Pipeline(argv=pipeline_args) as p:
def format_user_score_sums(user_score):
- return 'user: %s, total_score: %s' % (user_score[0], user_score[1])
+ (user, score) = user_score
+ return 'user: %s, total_score: %s' % (user, score)
(p # pylint: disable=expression-not-assigned
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 165237d..3f3ef03 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,7 +105,8 @@ def run(argv=None): # pylint: disable=missing-docstring
coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
def x_coord_key(x_y_i):
- return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
+ (x, y, i) = x_y_i
+ return (x, (x, y, i))
# Group each coordinate triplet by its x value, then write the coordinates
# to the output file with an x-coordinate grouping per line.
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 71e71e3..637d10a 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -52,7 +52,8 @@ class TfIdfTest(unittest.TestCase):
def test_tfidf_transform(self):
with TestPipeline() as p:
def re_key(word_uri_tfidf):
- return (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])
+ (word, (uri, tfidf)) = word_uri_tfidf
+ return (word, uri, tfidf)
uri_to_line = p | 'create sample' >> beam.Create(
[('1.txt', 'abc def ghi'),
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 03fcd8a..099fb08 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -180,7 +180,8 @@ def read_from_datastore(project, user_options, pipeline_options):
# Count the occurrences of each word.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
@@ -190,8 +191,9 @@ def read_from_datastore(project, user_options, pipeline_options):
| 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index fdfa286..b07b98d 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -95,22 +95,28 @@ def run(argv=None, assert_results=None):
# Prepare tab-delimited output; something like this:
# "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
- tsv_lines = grouped | beam.Map(
- lambda name_email_phone_snailmail: '\t'.join(
- ['"%s"' % name_email_phone_snailmail[0],
- '"%s"' % ','.join(name_email_phone_snailmail[1][0]),
- '"%s"' % ','.join(name_email_phone_snailmail[1][1]),
- '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
+ def format_as_tsv(name_email_phone_snailmail):
+ (name, (email, phone, snailmail)) = name_email_phone_snailmail
+ return '\t'.join(
+ ['"%s"' % name,
+ '"%s"' % ','.join(email),
+ '"%s"' % ','.join(phone),
+ '"%s"' % next(iter(snailmail), '')])
+
+ tsv_lines = grouped | beam.Map(format_as_tsv)
# Compute some stats about our database of people.
- def without_email(name_email_phone_snailmail1):
- return not next(iter(name_email_phone_snailmail1[1][0]), None)
+ def without_email(name_email_phone_snailmail):
+ (_, (email, _, _)) = name_email_phone_snailmail
+ return not next(iter(email), None)
- def without_phones(name_email_phone_snailmail2):
- return not next(iter(name_email_phone_snailmail2[1][1]), None)
+ def without_phones(name_email_phone_snailmail):
+ (_, (_, phone, _)) = name_email_phone_snailmail
+ return not next(iter(phone), None)
- def without_address(name_e_p_snailmail):
- return not next(iter(name_e_p_snailmail[1][2]), None)
+ def without_address(name_email_phone_snailmail):
+ (_, (_, _, snailmail)) = name_email_phone_snailmail
+ return not next(iter(snailmail), None)
luddites = grouped | beam.Filter( # People without email.
without_email)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index fe7929e..e3df3a8 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -120,10 +120,12 @@ class CountWords(beam.PTransform):
def expand(self, pcoll):
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
return (pcoll
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
@@ -169,7 +171,7 @@ def run(argv=None):
(character_count
| 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
- | 'count chars' >> beam.Map(lambda __counts: sum(__counts[1]))
+ | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
| 'write chars' >> WriteToText(known_args.output + '-chars'))
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 01118b3..54abd8c 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -535,8 +535,9 @@ def examples_wordcount_templated(renames):
lines = p | 'Read' >> ReadFromText(wordcount_options.input)
# [END example_wordcount_templated]
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
(
lines
@@ -614,8 +615,9 @@ def examples_wordcount_debugging(renames):
[('Flourish', 3), ('stomach', 1)]))
# [END example_wordcount_debugging_assert]
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = (filtered_words
| 'format' >> beam.Map(format_result)
@@ -1126,7 +1128,8 @@ def model_group_by_key(contents, output_path):
import apache_beam as beam
with TestPipeline() as p: # Use TestPipeline for testing.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
words_and_counts = (
p
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index d1af4ce..df8a99b 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -59,7 +59,8 @@ def run(argv=None):
# Capitalize the characters in each line.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
transformed = (lines
# Use a pre-defined function that imports the re package.
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index f88b942..4c7eee1 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -70,7 +70,8 @@ def run(argv=None):
# Capitalize the characters in each line.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
transformed = (lines
| 'Split' >> (beam.FlatMap(find_words)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 60f1ffe..b1c4a5e 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -89,7 +89,8 @@ def run(argv=None):
# Count the occurrences of each word.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
@@ -99,8 +100,9 @@ def run(argv=None):
| 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 7d18c0d..6ff8f26 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -94,7 +94,8 @@ class CountWords(beam.PTransform):
"""
def expand(self, pcoll):
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
return (pcoll
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
@@ -144,8 +145,9 @@ def run(argv=None):
# Format the counts into a PCollection of strings and write the output using
# a "Write" transform that has side effects.
# pylint: disable=unused-variable
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = (filtered_words
| 'format' >> beam.Map(format_result)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 54fd1f1..390c8c0 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,8 +106,9 @@ def run(argv=None):
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = counts | 'Format' >> beam.Map(format_result)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 6a7e269..647781f 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -98,16 +98,16 @@ class CoGroupByKey(PTransform):
def expand(self, pcolls):
"""Performs CoGroupByKey on argument pcolls; see class docstring."""
# For associating values in K-V pairs with the PCollections they came from.
- def _pair_tag_with_value(k_v, tag):
- (key, value) = k_v
+ def _pair_tag_with_value(key_value, tag):
+ (key, value) = key_value
return (key, (tag, value))
# Creates the key, value pairs for the output PCollection. Values are either
# lists or dicts (per the class docstring), initialized by the result of
# result_ctor(result_ctor_arg).
- def _merge_tagged_vals_under_key(k_grouped, result_ctor,
+ def _merge_tagged_vals_under_key(key_grouped, result_ctor,
result_ctor_arg):
- (key, grouped) = k_grouped
+ (key, grouped) = key_grouped
result_value = result_ctor(result_ctor_arg)
for tag, value in grouped:
result_value[tag].append(value)