You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/23 17:19:58 UTC
[1/3] beam git commit: Automatically convert examples to use with
syntax.
Repository: beam
Updated Branches:
refs/heads/master d0601b30c -> 474345f59
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index f7b51a7..6654fef 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -119,7 +119,6 @@ class ParDoTest(unittest.TestCase):
self.assertEqual({'A', 'C'}, set(all_capitals))
def test_pardo_with_label(self):
- # pylint: disable=line-too-long
words = ['aa', 'bbc', 'defg']
# [START model_pardo_with_label]
result = words | 'CountUniqueLetters' >> beam.Map(
@@ -129,41 +128,41 @@ class ParDoTest(unittest.TestCase):
self.assertEqual({1, 2, 4}, set(result))
def test_pardo_side_input(self):
- p = TestPipeline()
- words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
-
- # [START model_pardo_side_input]
- # Callable takes additional arguments.
- def filter_using_length(word, lower_bound, upper_bound=float('inf')):
- if lower_bound <= len(word) <= upper_bound:
- yield word
-
- # Construct a deferred side input.
- avg_word_len = (words
- | beam.Map(len)
- | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
-
- # Call with explicit side inputs.
- small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
-
- # A single deferred side input.
- larger_than_average = (words | 'large' >> beam.FlatMap(
- filter_using_length,
- lower_bound=pvalue.AsSingleton(avg_word_len)))
-
- # Mix and match.
- small_but_nontrivial = words | beam.FlatMap(filter_using_length,
- lower_bound=2,
- upper_bound=pvalue.AsSingleton(
- avg_word_len))
- # [END model_pardo_side_input]
-
- assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
- assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
- label='larger_than_average')
- assert_that(small_but_nontrivial, equal_to(['bb']),
- label='small_but_not_trivial')
- p.run()
+ # pylint: disable=line-too-long
+ with TestPipeline() as p:
+ words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
+
+ # [START model_pardo_side_input]
+ # Callable takes additional arguments.
+ def filter_using_length(word, lower_bound, upper_bound=float('inf')):
+ if lower_bound <= len(word) <= upper_bound:
+ yield word
+
+ # Construct a deferred side input.
+ avg_word_len = (words
+ | beam.Map(len)
+ | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
+
+ # Call with explicit side inputs.
+ small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
+
+ # A single deferred side input.
+ larger_than_average = (words | 'large' >> beam.FlatMap(
+ filter_using_length,
+ lower_bound=pvalue.AsSingleton(avg_word_len)))
+
+ # Mix and match.
+ small_but_nontrivial = words | beam.FlatMap(
+ filter_using_length,
+ lower_bound=2,
+ upper_bound=pvalue.AsSingleton(avg_word_len))
+ # [END model_pardo_side_input]
+
+ assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
+ assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
+ label='larger_than_average')
+ assert_that(small_but_nontrivial, equal_to(['bb']),
+ label='small_but_not_trivial')
def test_pardo_side_input_dofn(self):
words = ['a', 'bb', 'ccc', 'dddd']
@@ -307,10 +306,9 @@ class TypeHintsTest(unittest.TestCase):
def test_runtime_checks_off(self):
# pylint: disable=expression-not-assigned
- p = TestPipeline()
- # [START type_hints_runtime_off]
- p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
- p.run()
+ with TestPipeline() as p:
+ # [START type_hints_runtime_off]
+ p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
# [END type_hints_runtime_off]
def test_runtime_checks_on(self):
@@ -323,47 +321,45 @@ class TypeHintsTest(unittest.TestCase):
# [END type_hints_runtime_on]
def test_deterministic_key(self):
- p = TestPipeline()
- lines = (p | beam.Create(
- ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
+ with TestPipeline() as p:
+ lines = (p | beam.Create(
+ ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
- # For pickling
- global Player # pylint: disable=global-variable-not-assigned
+ # For pickling
+ global Player # pylint: disable=global-variable-not-assigned
- # [START type_hints_deterministic_key]
- class Player(object):
- def __init__(self, team, name):
- self.team = team
- self.name = name
+ # [START type_hints_deterministic_key]
+ class Player(object):
+ def __init__(self, team, name):
+ self.team = team
+ self.name = name
- class PlayerCoder(beam.coders.Coder):
- def encode(self, player):
- return '%s:%s' % (player.team, player.name)
+ class PlayerCoder(beam.coders.Coder):
+ def encode(self, player):
+ return '%s:%s' % (player.team, player.name)
- def decode(self, s):
- return Player(*s.split(':'))
+ def decode(self, s):
+ return Player(*s.split(':'))
- def is_deterministic(self):
- return True
+ def is_deterministic(self):
+ return True
- beam.coders.registry.register_coder(Player, PlayerCoder)
+ beam.coders.registry.register_coder(Player, PlayerCoder)
- def parse_player_and_score(csv):
- name, team, score = csv.split(',')
- return Player(team, name), int(score)
+ def parse_player_and_score(csv):
+ name, team, score = csv.split(',')
+ return Player(team, name), int(score)
- totals = (
- lines
- | beam.Map(parse_player_and_score)
- | beam.CombinePerKey(sum).with_input_types(
- beam.typehints.Tuple[Player, int]))
- # [END type_hints_deterministic_key]
+ totals = (
+ lines
+ | beam.Map(parse_player_and_score)
+ | beam.CombinePerKey(sum).with_input_types(
+ beam.typehints.Tuple[Player, int]))
+ # [END type_hints_deterministic_key]
- assert_that(
- totals | beam.Map(lambda (k, v): (k.name, v)),
- equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
-
- p.run()
+ assert_that(
+ totals | beam.Map(lambda (k, v): (k.name, v)),
+ equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
class SnippetsTest(unittest.TestCase):
@@ -802,109 +798,104 @@ class CombineTest(unittest.TestCase):
self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts))
def test_setting_fixed_windows(self):
- p = TestPipeline()
- unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120])
- items = (unkeyed_items
- | 'key' >> beam.Map(
- lambda x: beam.window.TimestampedValue(('k', x), x)))
- # [START setting_fixed_windows]
- from apache_beam import window
- fixed_windowed_items = (
- items | 'window' >> beam.WindowInto(window.FixedWindows(60)))
- # [END setting_fixed_windows]
- summed = (fixed_windowed_items
- | 'group' >> beam.GroupByKey()
- | 'combine' >> beam.CombineValues(sum))
- unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- assert_that(unkeyed, equal_to([110, 215, 120]))
- p.run()
+ with TestPipeline() as p:
+ unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120])
+ items = (unkeyed_items
+ | 'key' >> beam.Map(
+ lambda x: beam.window.TimestampedValue(('k', x), x)))
+ # [START setting_fixed_windows]
+ from apache_beam import window
+ fixed_windowed_items = (
+ items | 'window' >> beam.WindowInto(window.FixedWindows(60)))
+ # [END setting_fixed_windows]
+ summed = (fixed_windowed_items
+ | 'group' >> beam.GroupByKey()
+ | 'combine' >> beam.CombineValues(sum))
+ unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+ assert_that(unkeyed, equal_to([110, 215, 120]))
def test_setting_sliding_windows(self):
- p = TestPipeline()
- unkeyed_items = p | beam.Create([2, 16, 23])
- items = (unkeyed_items
- | 'key' >> beam.Map(
- lambda x: beam.window.TimestampedValue(('k', x), x)))
- # [START setting_sliding_windows]
- from apache_beam import window
- sliding_windowed_items = (
- items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))
- # [END setting_sliding_windows]
- summed = (sliding_windowed_items
- | 'group' >> beam.GroupByKey()
- | 'combine' >> beam.CombineValues(sum))
- unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- assert_that(unkeyed,
- equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
- p.run()
+ with TestPipeline() as p:
+ unkeyed_items = p | beam.Create([2, 16, 23])
+ items = (unkeyed_items
+ | 'key' >> beam.Map(
+ lambda x: beam.window.TimestampedValue(('k', x), x)))
+ # [START setting_sliding_windows]
+ from apache_beam import window
+ sliding_windowed_items = (
+ items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))
+ # [END setting_sliding_windows]
+ summed = (sliding_windowed_items
+ | 'group' >> beam.GroupByKey()
+ | 'combine' >> beam.CombineValues(sum))
+ unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+ assert_that(unkeyed,
+ equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
def test_setting_session_windows(self):
- p = TestPipeline()
- unkeyed_items = p | beam.Create([2, 11, 16, 27])
- items = (unkeyed_items
- | 'key' >> beam.Map(
- lambda x: beam.window.TimestampedValue(('k', x), x)))
- # [START setting_session_windows]
- from apache_beam import window
- session_windowed_items = (
- items | 'window' >> beam.WindowInto(window.Sessions(10)))
- # [END setting_session_windows]
- summed = (session_windowed_items
- | 'group' >> beam.GroupByKey()
- | 'combine' >> beam.CombineValues(sum))
- unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- assert_that(unkeyed,
- equal_to([29, 27]))
- p.run()
+ with TestPipeline() as p:
+ unkeyed_items = p | beam.Create([2, 11, 16, 27])
+ items = (unkeyed_items
+ | 'key' >> beam.Map(
+ lambda x: beam.window.TimestampedValue(('k', x), x)))
+ # [START setting_session_windows]
+ from apache_beam import window
+ session_windowed_items = (
+ items | 'window' >> beam.WindowInto(window.Sessions(10)))
+ # [END setting_session_windows]
+ summed = (session_windowed_items
+ | 'group' >> beam.GroupByKey()
+ | 'combine' >> beam.CombineValues(sum))
+ unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+ assert_that(unkeyed,
+ equal_to([29, 27]))
def test_setting_global_window(self):
- p = TestPipeline()
- unkeyed_items = p | beam.Create([2, 11, 16, 27])
- items = (unkeyed_items
- | 'key' >> beam.Map(
- lambda x: beam.window.TimestampedValue(('k', x), x)))
- # [START setting_global_window]
- from apache_beam import window
- session_windowed_items = (
- items | 'window' >> beam.WindowInto(window.GlobalWindows()))
- # [END setting_global_window]
- summed = (session_windowed_items
- | 'group' >> beam.GroupByKey()
- | 'combine' >> beam.CombineValues(sum))
- unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- assert_that(unkeyed, equal_to([56]))
- p.run()
+ with TestPipeline() as p:
+ unkeyed_items = p | beam.Create([2, 11, 16, 27])
+ items = (unkeyed_items
+ | 'key' >> beam.Map(
+ lambda x: beam.window.TimestampedValue(('k', x), x)))
+ # [START setting_global_window]
+ from apache_beam import window
+ session_windowed_items = (
+ items | 'window' >> beam.WindowInto(window.GlobalWindows()))
+ # [END setting_global_window]
+ summed = (session_windowed_items
+ | 'group' >> beam.GroupByKey()
+ | 'combine' >> beam.CombineValues(sum))
+ unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+ assert_that(unkeyed, equal_to([56]))
def test_setting_timestamp(self):
- p = TestPipeline()
- unkeyed_items = p | beam.Create([12, 30, 60, 61, 66])
- items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x)))
+ with TestPipeline() as p:
+ unkeyed_items = p | beam.Create([12, 30, 60, 61, 66])
+ items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x)))
- def extract_timestamp_from_log_entry(entry):
- return entry[1]
+ def extract_timestamp_from_log_entry(entry):
+ return entry[1]
- # [START setting_timestamp]
- class AddTimestampDoFn(beam.DoFn):
+ # [START setting_timestamp]
+ class AddTimestampDoFn(beam.DoFn):
- def process(self, element):
- # Extract the numeric Unix seconds-since-epoch timestamp to be
- # associated with the current log entry.
- unix_timestamp = extract_timestamp_from_log_entry(element)
- # Wrap and emit the current entry and new timestamp in a
- # TimestampedValue.
- yield beam.window.TimestampedValue(element, unix_timestamp)
-
- timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
- # [END setting_timestamp]
- fixed_windowed_items = (
- timestamped_items | 'window' >> beam.WindowInto(
- beam.window.FixedWindows(60)))
- summed = (fixed_windowed_items
- | 'group' >> beam.GroupByKey()
- | 'combine' >> beam.CombineValues(sum))
- unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
- assert_that(unkeyed, equal_to([42, 187]))
- p.run()
+ def process(self, element):
+ # Extract the numeric Unix seconds-since-epoch timestamp to be
+ # associated with the current log entry.
+ unix_timestamp = extract_timestamp_from_log_entry(element)
+ # Wrap and emit the current entry and new timestamp in a
+ # TimestampedValue.
+ yield beam.window.TimestampedValue(element, unix_timestamp)
+
+ timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
+ # [END setting_timestamp]
+ fixed_windowed_items = (
+ timestamped_items | 'window' >> beam.WindowInto(
+ beam.window.FixedWindows(60)))
+ summed = (fixed_windowed_items
+ | 'group' >> beam.GroupByKey()
+ | 'combine' >> beam.CombineValues(sum))
+ unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+ assert_that(unkeyed, equal_to([42, 187]))
class PTransformTest(unittest.TestCase):
@@ -919,10 +910,9 @@ class PTransformTest(unittest.TestCase):
return pcoll | beam.Map(lambda x: len(x))
# [END model_composite_transform]
- p = TestPipeline()
- lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
- assert_that(lengths, equal_to([1, 2, 3]))
- p.run()
+ with TestPipeline() as p:
+ lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
+ assert_that(lengths, equal_to([1, 2, 3]))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index d0cc8a2..ce43e1f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -41,22 +41,20 @@ def run(argv=None):
help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
known_args, pipeline_args = parser.parse_known_args(argv)
- p = beam.Pipeline(argv=pipeline_args)
+ with beam.Pipeline(argv=pipeline_args) as p:
- # Read the text file[pattern] into a PCollection.
- lines = p | beam.io.Read(
- beam.io.PubSubSource(known_args.input_topic))
+ # Read the text file[pattern] into a PCollection.
+ lines = p | beam.io.Read(
+ beam.io.PubSubSource(known_args.input_topic))
- # Capitalize the characters in each line.
- transformed = (lines
- | 'capitalize' >> (beam.Map(lambda x: x.upper())))
+ # Capitalize the characters in each line.
+ transformed = (lines
+ | 'capitalize' >> (beam.Map(lambda x: x.upper())))
- # Write to PubSub.
- # pylint: disable=expression-not-assigned
- transformed | beam.io.Write(
- beam.io.PubSubSink(known_args.output_topic))
-
- p.run().wait_until_finish()
+ # Write to PubSub.
+ # pylint: disable=expression-not-assigned
+ transformed | beam.io.Write(
+ beam.io.PubSubSink(known_args.output_topic))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 4b6aecc..e9d5dbe 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -44,29 +44,27 @@ def run(argv=None):
help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
known_args, pipeline_args = parser.parse_known_args(argv)
- p = beam.Pipeline(argv=pipeline_args)
-
- # Read the text file[pattern] into a PCollection.
- lines = p | 'read' >> beam.io.Read(
- beam.io.PubSubSource(known_args.input_topic))
-
- # Capitalize the characters in each line.
- transformed = (lines
- | 'Split' >> (
- beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
- | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
- | beam.WindowInto(window.FixedWindows(15, 0))
- | 'Group' >> beam.GroupByKey()
- | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
- | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
-
- # Write to PubSub.
- # pylint: disable=expression-not-assigned
- transformed | 'pubsub_write' >> beam.io.Write(
- beam.io.PubSubSink(known_args.output_topic))
-
- p.run().wait_until_finish()
+ with beam.Pipeline(argv=pipeline_args) as p:
+
+ # Read the text file[pattern] into a PCollection.
+ lines = p | 'read' >> beam.io.Read(
+ beam.io.PubSubSource(known_args.input_topic))
+
+ # Capitalize the characters in each line.
+ transformed = (lines
+ | 'Split' >> (
+ beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ .with_output_types(unicode))
+ | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
+ | beam.WindowInto(window.FixedWindows(15, 0))
+ | 'Group' >> beam.GroupByKey()
+ | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+ | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
+
+ # Write to PubSub.
+ # pylint: disable=expression-not-assigned
+ transformed | 'pubsub_write' >> beam.io.Write(
+ beam.io.PubSubSink(known_args.output_topic))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index e7e542a..34dedb2 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -102,7 +102,6 @@ def run(argv=None):
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(known_args.output)
- # Actually run the pipeline (all operations above are deferred).
result = p.run()
result.wait_until_finish()
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index ca9f7b6..c0ffd35 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -118,35 +118,32 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
- # Read the text file[pattern] into a PCollection, count the occurrences of
- # each word and filter by a list of words.
- filtered_words = (
- p | 'read' >> ReadFromText(known_args.input)
- | CountWords()
- | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
-
- # assert_that is a convenient PTransform that checks a PCollection has an
- # expected value. Asserts are best used in unit tests with small data sets but
- # is demonstrated here as a teaching tool.
- #
- # Note assert_that does not provide any output and that successful completion
- # of the Pipeline implies that the expectations were met. Learn more at
- # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
- # test your pipeline.
- assert_that(
- filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
-
- # Format the counts into a PCollection of strings and write the output using a
- # "Write" transform that has side effects.
- # pylint: disable=unused-variable
- output = (filtered_words
- | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
- | 'write' >> WriteToText(known_args.output))
-
- # Actually run the pipeline (all operations above are deferred).
- p.run().wait_until_finish()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ # Read the text file[pattern] into a PCollection, count the occurrences of
+ # each word and filter by a list of words.
+ filtered_words = (
+ p | 'read' >> ReadFromText(known_args.input)
+ | CountWords()
+ | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
+
+ # assert_that is a convenient PTransform that checks a PCollection has an
+ # expected value. Asserts are best used in unit tests with small data sets
+ # but is demonstrated here as a teaching tool.
+ #
+ # Note assert_that does not provide any output and that successful
+ # completion of the Pipeline implies that the expectations were met. Learn
+ # more at https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
+ # on how to best test your pipeline.
+ assert_that(
+ filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
+
+ # Format the counts into a PCollection of strings and write the output using
+ # a "Write" transform that has side effects.
+ # pylint: disable=unused-variable
+ output = (filtered_words
+ | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ | 'write' >> WriteToText(known_args.output))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 5109c08..76b0a22 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -92,28 +92,25 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
+ with beam.Pipeline(options=pipeline_options) as p:
- # Read the text file[pattern] into a PCollection.
- lines = p | 'read' >> ReadFromText(known_args.input)
+ # Read the text file[pattern] into a PCollection.
+ lines = p | ReadFromText(known_args.input)
- # Count the occurrences of each word.
- counts = (lines
- | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- .with_output_types(unicode))
- | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
- | 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+ # Count the occurrences of each word.
+ counts = (
+ lines
+ | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ .with_output_types(unicode))
+ | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
+ | 'GroupAndSum' >> beam.CombinePerKey(sum))
- # Format the counts into a PCollection of strings.
- output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ # Format the counts into a PCollection of strings.
+ output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
- # Write the output using a "Write" transform that has side effects.
- # pylint: disable=expression-not-assigned
- output | 'write' >> WriteToText(known_args.output)
-
- # Actually run the pipeline (all operations above are deferred).
- p.run().wait_until_finish()
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ output | WriteToText(known_args.output)
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/io/filebasedsink_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py
index 1f6aeee..7c8ddb4 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -146,9 +146,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
sink = MyFileBasedSink(
temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()
)
- p = TestPipeline()
- p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
- p.run()
+ with TestPipeline() as p:
+ p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
self.assertEqual(
open(temp_path + '-00000-of-00001.output').read(), '[start][end]')
@@ -160,9 +159,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
coder=coders.ToStringCoder()
)
- p = TestPipeline()
- p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
- p.run()
+ with TestPipeline() as p:
+ p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
self.assertEqual(
open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]')
@@ -174,10 +172,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
num_shards=3,
shard_name_template='_NN_SSS_',
coder=coders.ToStringCoder())
- p = TestPipeline()
- p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
-
- p.run()
+ with TestPipeline() as p:
+ p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
concat = ''.join(
open(temp_path + '_03_%03d_.output' % shard_num).read()
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 5048534..9093abf 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -28,19 +28,18 @@ to be executed for each node visited is specified through a runner object.
Typical usage:
# Create a pipeline object using a local runner for execution.
- p = beam.Pipeline('DirectRunner')
+ with beam.Pipeline('DirectRunner') as p:
- # Add to the pipeline a "Create" transform. When executed this
- # transform will produce a PCollection object with the specified values.
- pcoll = p | 'Create' >> beam.Create([1, 2, 3])
+ # Add to the pipeline a "Create" transform. When executed this
+ # transform will produce a PCollection object with the specified values.
+ pcoll = p | 'Create' >> beam.Create([1, 2, 3])
- # Another transform could be applied to pcoll, e.g., writing to a text file.
- # For other transforms, refer to transforms/ directory.
- pcoll | 'Write' >> beam.io.WriteToText('./output')
+ # Another transform could be applied to pcoll, e.g., writing to a text file.
+ # For other transforms, refer to transforms/ directory.
+ pcoll | 'Write' >> beam.io.WriteToText('./output')
- # run() will execute the DAG stored in the pipeline. The execution of the
- # nodes visited is done using the specified local runner.
- p.run()
+ # run() will execute the DAG stored in the pipeline. The execution of the
+ # nodes visited is done using the specified local runner.
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 946a60a..c79fec8 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -247,26 +247,23 @@ class CombineTest(unittest.TestCase):
pipeline.run()
def test_tuple_combine_fn(self):
- p = TestPipeline()
- result = (
- p
- | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
- | beam.CombineGlobally(combine.TupleCombineFn(max,
- combine.MeanCombineFn(),
- sum)).without_defaults())
- assert_that(result, equal_to([('c', 111.0 / 3, 99.0)]))
- p.run()
+ with TestPipeline() as p:
+ result = (
+ p
+ | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+ | beam.CombineGlobally(combine.TupleCombineFn(
+ max, combine.MeanCombineFn(), sum)).without_defaults())
+ assert_that(result, equal_to([('c', 111.0 / 3, 99.0)]))
def test_tuple_combine_fn_without_defaults(self):
- p = TestPipeline()
- result = (
- p
- | Create([1, 1, 2, 3])
- | beam.CombineGlobally(
- combine.TupleCombineFn(min, combine.MeanCombineFn(), max)
- .with_common_input()).without_defaults())
- assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
- p.run()
+ with TestPipeline() as p:
+ result = (
+ p
+ | Create([1, 1, 2, 3])
+ | beam.CombineGlobally(
+ combine.TupleCombineFn(min, combine.MeanCombineFn(), max)
+ .with_common_input()).without_defaults())
+ assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
def test_to_list_and_to_dict(self):
pipeline = TestPipeline()
@@ -295,29 +292,26 @@ class CombineTest(unittest.TestCase):
pipeline.run()
def test_combine_globally_with_default(self):
- p = TestPipeline()
- assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
- p.run()
+ with TestPipeline() as p:
+ assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
def test_combine_globally_without_default(self):
- p = TestPipeline()
- result = p | Create([]) | CombineGlobally(sum).without_defaults()
- assert_that(result, equal_to([]))
- p.run()
+ with TestPipeline() as p:
+ result = p | Create([]) | CombineGlobally(sum).without_defaults()
+ assert_that(result, equal_to([]))
def test_combine_globally_with_default_side_input(self):
- class CombineWithSideInput(PTransform):
+ class SideInputCombine(PTransform):
def expand(self, pcoll):
side = pcoll | CombineGlobally(sum).as_singleton_view()
main = pcoll.pipeline | Create([None])
return main | Map(lambda _, s: s, side)
- p = TestPipeline()
- result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
- result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
- assert_that(result1, equal_to([0]), label='r1')
- assert_that(result2, equal_to([10]), label='r2')
- p.run()
+ with TestPipeline() as p:
+ result1 = p | 'i1' >> Create([]) | 'c1' >> SideInputCombine()
+ result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> SideInputCombine()
+ assert_that(result1, equal_to([0]), label='r1')
+ assert_that(result2, equal_to([10]), label='r2')
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index fd1bb9d..977a364 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -167,90 +167,85 @@ class WindowTest(unittest.TestCase):
| Map(lambda x: WindowedValue((key, x), x, [GlobalWindow()])))
def test_sliding_windows(self):
- p = TestPipeline()
- pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
- result = (pcoll
- | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
- | GroupByKey()
- | reify_windows)
- expected = [('key @ [-2.0, 2.0)', [1]),
- ('key @ [0.0, 4.0)', [1, 2, 3]),
- ('key @ [2.0, 6.0)', [2, 3])]
- assert_that(result, equal_to(expected))
- p.run()
+ with TestPipeline() as p:
+ pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
+ result = (pcoll
+ | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
+ | GroupByKey()
+ | reify_windows)
+ expected = [('key @ [-2.0, 2.0)', [1]),
+ ('key @ [0.0, 4.0)', [1, 2, 3]),
+ ('key @ [2.0, 6.0)', [2, 3])]
+ assert_that(result, equal_to(expected))
def test_sessions(self):
- p = TestPipeline()
- pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
- result = (pcoll
- | 'w' >> WindowInto(Sessions(10))
- | GroupByKey()
- | sort_values
- | reify_windows)
- expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
- ('key @ [20.0, 45.0)', [20, 27, 35])]
- assert_that(result, equal_to(expected))
- p.run()
+ with TestPipeline() as p:
+ pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
+ result = (pcoll
+ | 'w' >> WindowInto(Sessions(10))
+ | GroupByKey()
+ | sort_values
+ | reify_windows)
+ expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
+ ('key @ [20.0, 45.0)', [20, 27, 35])]
+ assert_that(result, equal_to(expected))
def test_timestamped_value(self):
- p = TestPipeline()
- result = (p
- | 'start' >> Create([(k, k) for k in range(10)])
- | Map(lambda (x, t): TimestampedValue(x, t))
- | 'w' >> WindowInto(FixedWindows(5))
- | Map(lambda v: ('key', v))
- | GroupByKey())
- assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
- ('key', [5, 6, 7, 8, 9])]))
- p.run()
+ with TestPipeline() as p:
+ result = (p
+ | 'start' >> Create([(k, k) for k in range(10)])
+ | Map(lambda (x, t): TimestampedValue(x, t))
+ | 'w' >> WindowInto(FixedWindows(5))
+ | Map(lambda v: ('key', v))
+ | GroupByKey())
+ assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
+ ('key', [5, 6, 7, 8, 9])]))
def test_rewindow(self):
- p = TestPipeline()
- result = (p
- | Create([(k, k) for k in range(10)])
- | Map(lambda (x, t): TimestampedValue(x, t))
- | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
- # Per the model, each element is now duplicated across
- # three windows. Rewindowing must preserve this duplication.
- | 'rewindow' >> WindowInto(FixedWindows(5))
- | 'rewindow2' >> WindowInto(FixedWindows(5))
- | Map(lambda v: ('key', v))
- | GroupByKey())
- assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
- ('key', sorted([5, 6, 7, 8, 9] * 3))]))
- p.run()
+ with TestPipeline() as p:
+ result = (p
+ | Create([(k, k) for k in range(10)])
+ | Map(lambda (x, t): TimestampedValue(x, t))
+ | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
+ # Per the model, each element is now duplicated across
+ # three windows. Rewindowing must preserve this duplication.
+ | 'rewindow' >> WindowInto(FixedWindows(5))
+ | 'rewindow2' >> WindowInto(FixedWindows(5))
+ | Map(lambda v: ('key', v))
+ | GroupByKey())
+ assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
+ ('key', sorted([5, 6, 7, 8, 9] * 3))]))
def test_timestamped_with_combiners(self):
- p = TestPipeline()
- result = (p
- # Create some initial test values.
- | 'start' >> Create([(k, k) for k in range(10)])
- # The purpose of the WindowInto transform is to establish a
- # FixedWindows windowing function for the PCollection.
- # It does not bucket elements into windows since the timestamps
- # from Create are not spaced 5 ms apart and very likely they all
- # fall into the same window.
- | 'w' >> WindowInto(FixedWindows(5))
- # Generate timestamped values using the values as timestamps.
- # Now there are values 5 ms apart and since Map propagates the
- # windowing function from input to output the output PCollection
- # will have elements falling into different 5ms windows.
- | Map(lambda (x, t): TimestampedValue(x, t))
- # We add a 'key' to each value representing the index of the
- # window. This is important since there is no guarantee of
- # order for the elements of a PCollection.
- | Map(lambda v: (v / 5, v)))
- # Sum all elements associated with a key and window. Although it
- # is called CombinePerKey it is really CombinePerKeyAndWindow the
- # same way GroupByKey is really GroupByKeyAndWindow.
- sum_per_window = result | CombinePerKey(sum)
- # Compute mean per key and window.
- mean_per_window = result | combiners.Mean.PerKey()
- assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
- label='assert:sum')
- assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
- label='assert:mean')
- p.run()
+ with TestPipeline() as p:
+ result = (p
+ # Create some initial test values.
+ | 'start' >> Create([(k, k) for k in range(10)])
+ # The purpose of the WindowInto transform is to establish a
+ # FixedWindows windowing function for the PCollection.
+ # It does not bucket elements into windows since the timestamps
+ # from Create are not spaced 5 ms apart and very likely they all
+ # fall into the same window.
+ | 'w' >> WindowInto(FixedWindows(5))
+ # Generate timestamped values using the values as timestamps.
+ # Now there are values 5 ms apart and since Map propagates the
+ # windowing function from input to output the output PCollection
+ # will have elements falling into different 5ms windows.
+ | Map(lambda (x, t): TimestampedValue(x, t))
+ # We add a 'key' to each value representing the index of the
+ # window. This is important since there is no guarantee of
+ # order for the elements of a PCollection.
+ | Map(lambda v: (v / 5, v)))
+ # Sum all elements associated with a key and window. Although it
+ # is called CombinePerKey it is really CombinePerKeyAndWindow the
+ # same way GroupByKey is really GroupByKeyAndWindow.
+ sum_per_window = result | CombinePerKey(sum)
+ # Compute mean per key and window.
+ mean_per_window = result | combiners.Mean.PerKey()
+ assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
+ label='assert:sum')
+ assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
+ label='assert:mean')
class RunnerApiTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index e31b9cc..50f0deb 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -98,11 +98,10 @@ class WriteTest(unittest.TestCase):
return_write_results=True):
write_to_test_sink = WriteToTestSink(return_init_result,
return_write_results)
- p = TestPipeline()
- result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
+ with TestPipeline() as p:
+ result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
- assert_that(result, is_empty())
- p.run()
+ assert_that(result, is_empty())
sink = write_to_test_sink.last_sink
self.assertIsNotNone(sink)
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 589dc0e..c81ef32 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -168,12 +168,11 @@ class SideInputTest(unittest.TestCase):
@typehints.with_input_types(str, int)
def repeat(s, times):
return s * times
- p = TestPipeline()
- main_input = p | beam.Create(['a', 'bb', 'c'])
- side_input = p | 'side' >> beam.Create([3])
- result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
- assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
- p.run()
+ with TestPipeline() as p:
+ main_input = p | beam.Create(['a', 'bb', 'c'])
+ side_input = p | 'side' >> beam.Create([3])
+ result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
+ assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
bad_side_input = p | 'bad_side' >> beam.Create(['z'])
with self.assertRaises(typehints.TypeCheckError):
@@ -183,12 +182,11 @@ class SideInputTest(unittest.TestCase):
@typehints.with_input_types(str, typehints.Iterable[str])
def concat(glue, items):
return glue.join(sorted(items))
- p = TestPipeline()
- main_input = p | beam.Create(['a', 'bb', 'c'])
- side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
- result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
- assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
- p.run()
+ with TestPipeline() as p:
+ main_input = p | beam.Create(['a', 'bb', 'c'])
+ side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
+ result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
+ assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3])
with self.assertRaises(typehints.TypeCheckError):
[2/3] beam git commit: Automatically convert examples to use with
syntax.
Posted by ro...@apache.org.
Automatically convert examples to use with syntax.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3a62b4f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3a62b4f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3a62b4f7
Branch: refs/heads/master
Commit: 3a62b4f7b20bda2b3c4ca648f90988d387cfe20d
Parents: d0601b3
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 18 17:22:25 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 23 10:19:04 2017 -0700
----------------------------------------------------------------------
.../examples/complete/autocomplete.py | 19 +-
.../examples/complete/autocomplete_test.py | 31 +-
.../examples/complete/estimate_pi.py | 11 +-
.../examples/complete/estimate_pi_test.py | 12 +-
.../examples/complete/game/hourly_team_score.py | 19 +-
.../examples/complete/game/user_score.py | 15 +-
.../complete/juliaset/juliaset/juliaset.py | 44 +-
.../apache_beam/examples/complete/tfidf.py | 21 +-
.../apache_beam/examples/complete/tfidf_test.py | 28 +-
.../examples/complete/top_wikipedia_sessions.py | 12 +-
.../complete/top_wikipedia_sessions_test.py | 9 +-
.../examples/cookbook/bigquery_schema.py | 159 +++---
.../examples/cookbook/bigquery_side_input.py | 51 +-
.../cookbook/bigquery_side_input_test.py | 39 +-
.../examples/cookbook/bigquery_tornadoes.py | 33 +-
.../cookbook/bigquery_tornadoes_test.py | 19 +-
.../apache_beam/examples/cookbook/coders.py | 16 +-
.../examples/cookbook/coders_test.py | 14 +-
.../examples/cookbook/custom_ptransform.py | 27 +-
.../examples/cookbook/custom_ptransform_test.py | 11 +-
.../examples/cookbook/datastore_wordcount.py | 20 +-
.../apache_beam/examples/cookbook/filters.py | 21 +-
.../examples/cookbook/group_with_coder.py | 43 +-
.../examples/cookbook/group_with_coder_test.py | 4 +-
.../examples/cookbook/mergecontacts.py | 115 +++--
.../examples/cookbook/mergecontacts_test.py | 3 +-
.../examples/cookbook/multiple_output_pardo.py | 72 ++-
.../cookbook/multiple_output_pardo_test.py | 2 +-
.../apache_beam/examples/snippets/snippets.py | 494 +++++++++----------
.../examples/snippets/snippets_test.py | 326 ++++++------
.../apache_beam/examples/streaming_wordcap.py | 24 +-
.../apache_beam/examples/streaming_wordcount.py | 44 +-
sdks/python/apache_beam/examples/wordcount.py | 1 -
.../apache_beam/examples/wordcount_debugging.py | 55 +--
.../apache_beam/examples/wordcount_minimal.py | 33 +-
.../python/apache_beam/io/filebasedsink_test.py | 16 +-
sdks/python/apache_beam/pipeline.py | 19 +-
.../apache_beam/transforms/combiners_test.py | 58 +--
.../apache_beam/transforms/window_test.py | 147 +++---
.../transforms/write_ptransform_test.py | 7 +-
.../typehints/typed_pipeline_test.py | 22 +-
41 files changed, 1005 insertions(+), 1111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index f0acc3f..ab3397c 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -44,16 +44,15 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
- (p # pylint: disable=expression-not-assigned
- | 'read' >> ReadFromText(known_args.input)
- | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- | 'TopPerPrefix' >> TopPerPrefix(5)
- | 'format' >> beam.Map(
- lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
- | 'write' >> WriteToText(known_args.output))
- p.run()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ (p # pylint: disable=expression-not-assigned
+ | 'read' >> ReadFromText(known_args.input)
+ | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ | 'TopPerPrefix' >> TopPerPrefix(5)
+ | 'format' >> beam.Map(
+ lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+ | 'write' >> WriteToText(known_args.output))
class TopPerPrefix(beam.PTransform):
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 378d222..e2c84d6 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,22 +31,21 @@ class AutocompleteTest(unittest.TestCase):
WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
def test_top_prefixes(self):
- p = TestPipeline()
- words = p | beam.Create(self.WORDS)
- result = words | autocomplete.TopPerPrefix(5)
- # values must be hashable for now
- result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
- assert_that(result, equal_to(
- [
- ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
- ('to', ((3, 'to'), )),
- ('th', ((2, 'this'), (1, 'that'))),
- ('thi', ((2, 'this'), )),
- ('this', ((2, 'this'), )),
- ('tha', ((1, 'that'), )),
- ('that', ((1, 'that'), )),
- ]))
- p.run()
+ with TestPipeline() as p:
+ words = p | beam.Create(self.WORDS)
+ result = words | autocomplete.TopPerPrefix(5)
+ # values must be hashable for now
+ result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
+ assert_that(result, equal_to(
+ [
+ ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
+ ('to', ((3, 'to'), )),
+ ('th', ((2, 'this'), (1, 'that'))),
+ ('thi', ((2, 'this'), )),
+ ('this', ((2, 'this'), )),
+ ('tha', ((1, 'that'), )),
+ ('that', ((1, 'that'), )),
+ ]))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index c709713..7e3c4cd 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -113,14 +113,11 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
+ with beam.Pipeline(options=pipeline_options) as p:
- (p # pylint: disable=expression-not-assigned
- | EstimatePiTransform()
- | WriteToText(known_args.output, coder=JsonCoder()))
-
- # Actually run the pipeline (all operations above are deferred).
- p.run()
+ (p # pylint: disable=expression-not-assigned
+ | EstimatePiTransform()
+ | WriteToText(known_args.output, coder=JsonCoder()))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index fd51309..f1cbb0a 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -38,13 +38,13 @@ def in_between(lower, upper):
class EstimatePiTest(unittest.TestCase):
def test_basics(self):
- p = TestPipeline()
- result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000)
+ with TestPipeline() as p:
+ result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000)
- # Note: Probabilistically speaking this test can fail with a probability
- # that is very small (VERY) given that we run at least 500 thousand trials.
- assert_that(result, in_between(3.125, 3.155))
- p.run()
+ # Note: Probabilistically speaking this test can fail with a probability
+ # that is very small (VERY) given that we run at least 500 thousand
+ # trials.
+ assert_that(result, in_between(3.125, 3.155))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
index e9d7188..9f398d9 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -276,18 +276,15 @@ def run(argv=None):
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
- p = beam.Pipeline(options=pipeline_options)
pipeline_options.view_as(SetupOptions).save_main_session = True
-
- (p # pylint: disable=expression-not-assigned
- | ReadFromText(known_args.input)
- | HourlyTeamScore(
- known_args.start_min, known_args.stop_min, known_args.window_duration)
- | WriteWindowedToBigQuery(
- known_args.table_name, known_args.dataset, configure_bigquery_write()))
-
- result = p.run()
- result.wait_until_finish()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ (p # pylint: disable=expression-not-assigned
+ | ReadFromText(known_args.input)
+ | HourlyTeamScore(
+ known_args.start_min, known_args.stop_min, known_args.window_duration)
+ | WriteWindowedToBigQuery(
+ known_args.table_name, known_args.dataset, configure_bigquery_write()))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 389d2c6..c9f2738 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -201,16 +201,13 @@ def run(argv=None):
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
- p = beam.Pipeline(options=pipeline_options)
+ with beam.Pipeline(options=pipeline_options) as p:
- (p # pylint: disable=expression-not-assigned
- | ReadFromText(known_args.input) # Read events from a file and parse them.
- | UserScore()
- | WriteToBigQuery(
- known_args.table_name, known_args.dataset, configure_bigquery_write()))
-
- result = p.run()
- result.wait_until_finish()
+ (p # pylint: disable=expression-not-assigned
+ | ReadFromText(known_args.input) # Read events from a file and parse them.
+ | UserScore()
+ | WriteToBigQuery(
+ known_args.table_name, known_args.dataset, configure_bigquery_write()))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 5ff2b78..61e3fd1 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -99,26 +99,24 @@ def run(argv=None): # pylint: disable=missing-docstring
help='Output file to write the resulting image to.')
known_args, pipeline_args = parser.parse_known_args(argv)
- p = beam.Pipeline(argv=pipeline_args)
- n = int(known_args.grid_size)
-
- coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
-
- # Group each coordinate triplet by its x value, then write the coordinates to
- # the output file with an x-coordinate grouping per line.
- # pylint: disable=expression-not-assigned
- (coordinates
- | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
- | 'x coord' >> beam.GroupByKey()
- | 'format' >> beam.Map(
- lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
- | WriteToText(known_args.coordinate_output))
- # pylint: enable=expression-not-assigned
- return p.run().wait_until_finish()
-
- # Optionally render the image and save it to a file.
- # TODO(silviuc): Add this functionality.
- # if p.options.image_output is not None:
- # julia_set_image = generate_julia_set_visualization(
- # file_with_coordinates, n, 100)
- # save_julia_set_visualization(p.options.image_output, julia_set_image)
+ with beam.Pipeline(argv=pipeline_args) as p:
+ n = int(known_args.grid_size)
+
+ coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
+
+ # Group each coordinate triplet by its x value, then write the coordinates
+ # to the output file with an x-coordinate grouping per line.
+ # pylint: disable=expression-not-assigned
+ (coordinates
+ | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+ | 'x coord' >> beam.GroupByKey()
+ | 'format' >> beam.Map(
+ lambda (k, coords): ' '.join('(%s, %s, %s)' % c for c in coords))
+ | WriteToText(known_args.coordinate_output))
+
+ # Optionally render the image and save it to a file.
+ # TODO(silviuc): Add this functionality.
+ # if p.options.image_output is not None:
+ # julia_set_image = generate_julia_set_visualization(
+ # file_with_coordinates, n, 100)
+ # save_julia_set_visualization(p.options.image_output, julia_set_image)
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index a98d906..a88ff82 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -191,17 +191,16 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
- # Read documents specified by the uris command line option.
- pcoll = read_documents(p, glob.glob(known_args.uris))
- # Compute TF-IDF information for each word.
- output = pcoll | TfIdf()
- # Write the output using a "Write" transform that has side effects.
- # pylint: disable=expression-not-assigned
- output | 'write' >> WriteToText(known_args.output)
- # Execute the pipeline and wait until it is completed.
- p.run().wait_until_finish()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ # Read documents specified by the uris command line option.
+ pcoll = read_documents(p, glob.glob(known_args.uris))
+ # Compute TF-IDF information for each word.
+ output = pcoll | TfIdf()
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ output | 'write' >> WriteToText(known_args.output)
+ # Execute the pipeline and wait until it is completed.
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index f177dfc..322426f 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -50,20 +50,20 @@ class TfIdfTest(unittest.TestCase):
f.write(contents)
def test_tfidf_transform(self):
- p = TestPipeline()
- uri_to_line = p | 'create sample' >> beam.Create(
- [('1.txt', 'abc def ghi'),
- ('2.txt', 'abc def'),
- ('3.txt', 'abc')])
- result = (
- uri_to_line
- | tfidf.TfIdf()
- | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
- assert_that(result, equal_to(EXPECTED_RESULTS))
- # Run the pipeline. Note that the assert_that above adds to the pipeline
- # a check that the result PCollection contains expected values. To actually
- # trigger the check the pipeline must be run.
- p.run()
+ with TestPipeline() as p:
+ uri_to_line = p | 'create sample' >> beam.Create(
+ [('1.txt', 'abc def ghi'),
+ ('2.txt', 'abc def'),
+ ('3.txt', 'abc')])
+ result = (
+ uri_to_line
+ | tfidf.TfIdf()
+ | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+ assert_that(result, equal_to(EXPECTED_RESULTS))
+ # Run the pipeline. Note that the assert_that above adds to the pipeline
+ # a check that the result PCollection contains expected values.
+ # To actually trigger the check the pipeline must be run (e.g. by
+ # exiting the with context).
def test_basics(self):
# Setup the files with expected content.
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index aa48e4e..9a9ad78 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -159,14 +159,12 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
+ with beam.Pipeline(options=pipeline_options) as p:
- (p # pylint: disable=expression-not-assigned
- | ReadFromText(known_args.input)
- | ComputeTopSessions(known_args.sampling_threshold)
- | WriteToText(known_args.output))
-
- p.run()
+ (p # pylint: disable=expression-not-assigned
+ | ReadFromText(known_args.input)
+ | ComputeTopSessions(known_args.sampling_threshold)
+ | WriteToText(known_args.output))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 5fb6276..ced8a44 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -52,12 +52,11 @@ class ComputeTopSessionsTest(unittest.TestCase):
]
def test_compute_top_sessions(self):
- p = TestPipeline()
- edits = p | beam.Create(self.EDITS)
- result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
+ with TestPipeline() as p:
+ edits = p | beam.Create(self.EDITS)
+ result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
- assert_that(result, equal_to(self.EXPECTED))
- p.run()
+ assert_that(result, equal_to(self.EXPECTED))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index 400189e..3a8af67 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -42,86 +42,85 @@ def run(argv=None):
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
- p = beam.Pipeline(argv=pipeline_args)
-
- from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position
-
- table_schema = bigquery.TableSchema()
-
- # Fields that use standard types.
- kind_schema = bigquery.TableFieldSchema()
- kind_schema.name = 'kind'
- kind_schema.type = 'string'
- kind_schema.mode = 'nullable'
- table_schema.fields.append(kind_schema)
-
- full_name_schema = bigquery.TableFieldSchema()
- full_name_schema.name = 'fullName'
- full_name_schema.type = 'string'
- full_name_schema.mode = 'required'
- table_schema.fields.append(full_name_schema)
-
- age_schema = bigquery.TableFieldSchema()
- age_schema.name = 'age'
- age_schema.type = 'integer'
- age_schema.mode = 'nullable'
- table_schema.fields.append(age_schema)
-
- gender_schema = bigquery.TableFieldSchema()
- gender_schema.name = 'gender'
- gender_schema.type = 'string'
- gender_schema.mode = 'nullable'
- table_schema.fields.append(gender_schema)
-
- # A nested field
- phone_number_schema = bigquery.TableFieldSchema()
- phone_number_schema.name = 'phoneNumber'
- phone_number_schema.type = 'record'
- phone_number_schema.mode = 'nullable'
-
- area_code = bigquery.TableFieldSchema()
- area_code.name = 'areaCode'
- area_code.type = 'integer'
- area_code.mode = 'nullable'
- phone_number_schema.fields.append(area_code)
-
- number = bigquery.TableFieldSchema()
- number.name = 'number'
- number.type = 'integer'
- number.mode = 'nullable'
- phone_number_schema.fields.append(number)
- table_schema.fields.append(phone_number_schema)
-
- # A repeated field.
- children_schema = bigquery.TableFieldSchema()
- children_schema.name = 'children'
- children_schema.type = 'string'
- children_schema.mode = 'repeated'
- table_schema.fields.append(children_schema)
-
- def create_random_record(record_id):
- return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id,
- 'age': int(record_id) * 10, 'gender': 'male',
- 'phoneNumber': {
- 'areaCode': int(record_id) * 100,
- 'number': int(record_id) * 100000},
- 'children': ['child' + record_id + '1',
- 'child' + record_id + '2',
- 'child' + record_id + '3']
- }
-
- # pylint: disable=expression-not-assigned
- record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
- records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
- records | 'write' >> beam.io.Write(
- beam.io.BigQuerySink(
- known_args.output,
- schema=table_schema,
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
-
- # Run the pipeline (all operations are deferred until run() is called).
- p.run()
+ with beam.Pipeline(argv=pipeline_args) as p:
+
+ from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position
+
+ table_schema = bigquery.TableSchema()
+
+ # Fields that use standard types.
+ kind_schema = bigquery.TableFieldSchema()
+ kind_schema.name = 'kind'
+ kind_schema.type = 'string'
+ kind_schema.mode = 'nullable'
+ table_schema.fields.append(kind_schema)
+
+ full_name_schema = bigquery.TableFieldSchema()
+ full_name_schema.name = 'fullName'
+ full_name_schema.type = 'string'
+ full_name_schema.mode = 'required'
+ table_schema.fields.append(full_name_schema)
+
+ age_schema = bigquery.TableFieldSchema()
+ age_schema.name = 'age'
+ age_schema.type = 'integer'
+ age_schema.mode = 'nullable'
+ table_schema.fields.append(age_schema)
+
+ gender_schema = bigquery.TableFieldSchema()
+ gender_schema.name = 'gender'
+ gender_schema.type = 'string'
+ gender_schema.mode = 'nullable'
+ table_schema.fields.append(gender_schema)
+
+ # A nested field
+ phone_number_schema = bigquery.TableFieldSchema()
+ phone_number_schema.name = 'phoneNumber'
+ phone_number_schema.type = 'record'
+ phone_number_schema.mode = 'nullable'
+
+ area_code = bigquery.TableFieldSchema()
+ area_code.name = 'areaCode'
+ area_code.type = 'integer'
+ area_code.mode = 'nullable'
+ phone_number_schema.fields.append(area_code)
+
+ number = bigquery.TableFieldSchema()
+ number.name = 'number'
+ number.type = 'integer'
+ number.mode = 'nullable'
+ phone_number_schema.fields.append(number)
+ table_schema.fields.append(phone_number_schema)
+
+ # A repeated field.
+ children_schema = bigquery.TableFieldSchema()
+ children_schema.name = 'children'
+ children_schema.type = 'string'
+ children_schema.mode = 'repeated'
+ table_schema.fields.append(children_schema)
+
+ def create_random_record(record_id):
+ return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id,
+ 'age': int(record_id) * 10, 'gender': 'male',
+ 'phoneNumber': {
+ 'areaCode': int(record_id) * 100,
+ 'number': int(record_id) * 100000},
+ 'children': ['child' + record_id + '1',
+ 'child' + record_id + '2',
+ 'child' + record_id + '3']
+ }
+
+ # pylint: disable=expression-not-assigned
+ record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
+ records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
+ records | 'write' >> beam.io.Write(
+ beam.io.BigQuerySink(
+ known_args.output,
+ schema=table_schema,
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
+
+ # Run the pipeline (all operations are deferred until run() is called).
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 6b28818..9911a67 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -88,32 +88,31 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
- group_ids = []
- for i in xrange(0, int(known_args.num_groups)):
- group_ids.append('id' + str(i))
-
- query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare'
- query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare'
- ignore_corpus = known_args.ignore_corpus
- ignore_word = known_args.ignore_word
-
- pcoll_corpus = p | 'read corpus' >> beam.io.Read(
- beam.io.BigQuerySource(query=query_corpus))
- pcoll_word = p | 'read_words' >> beam.io.Read(
- beam.io.BigQuerySource(query=query_word))
- pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
- [ignore_corpus])
- pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
- pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
-
- pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
- pcoll_ignore_corpus, pcoll_ignore_word)
-
- # pylint:disable=expression-not-assigned
- pcoll_groups | WriteToText(known_args.output)
- p.run()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ group_ids = []
+ for i in xrange(0, int(known_args.num_groups)):
+ group_ids.append('id' + str(i))
+
+ query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare'
+ query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare'
+ ignore_corpus = known_args.ignore_corpus
+ ignore_word = known_args.ignore_word
+
+ pcoll_corpus = p | 'read corpus' >> beam.io.Read(
+ beam.io.BigQuerySource(query=query_corpus))
+ pcoll_word = p | 'read_words' >> beam.io.Read(
+ beam.io.BigQuerySource(query=query_word))
+ pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
+ [ignore_corpus])
+ pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
+ pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
+
+ pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
+ pcoll_ignore_corpus, pcoll_ignore_word)
+
+ # pylint:disable=expression-not-assigned
+ pcoll_groups | WriteToText(known_args.output)
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index b11dc47..964b35b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -30,25 +30,26 @@ from apache_beam.testing.util import equal_to
class BigQuerySideInputTest(unittest.TestCase):
def test_create_groups(self):
- p = TestPipeline()
-
- group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C'])
- corpus_pcoll = p | 'CreateCorpus' >> beam.Create(
- [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
- words_pcoll = p | 'CreateWords' >> beam.Create(
- [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}])
- ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1'])
- ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1'])
-
- groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
- words_pcoll, ignore_corpus_pcoll,
- ignore_word_pcoll)
-
- assert_that(groups, equal_to(
- [('A', 'corpus2', 'word2'),
- ('B', 'corpus2', 'word2'),
- ('C', 'corpus2', 'word2')]))
- p.run()
+ with TestPipeline() as p:
+
+ group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C'])
+ corpus_pcoll = p | 'CreateCorpus' >> beam.Create(
+ [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
+ words_pcoll = p | 'CreateWords' >> beam.Create(
+ [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}])
+ ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1'])
+ ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1'])
+
+ groups = bigquery_side_input.create_groups(group_ids_pcoll,
+ corpus_pcoll,
+ words_pcoll,
+ ignore_corpus_pcoll,
+ ignore_word_pcoll)
+
+ assert_that(groups, equal_to(
+ [('A', 'corpus2', 'word2'),
+ ('B', 'corpus2', 'word2'),
+ ('C', 'corpus2', 'word2')]))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index ed0c79a..d3b216e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -75,23 +75,22 @@ def run(argv=None):
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
- p = beam.Pipeline(argv=pipeline_args)
-
- # Read the table rows into a PCollection.
- rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
- counts = count_tornadoes(rows)
-
- # Write the output using a "Write" transform that has side effects.
- # pylint: disable=expression-not-assigned
- counts | 'write' >> beam.io.Write(
- beam.io.BigQuerySink(
- known_args.output,
- schema='month:INTEGER, tornado_count:INTEGER',
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
-
- # Run the pipeline (all operations are deferred until run() is called).
- p.run().wait_until_finish()
+ with beam.Pipeline(argv=pipeline_args) as p:
+
+ # Read the table rows into a PCollection.
+ rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
+ counts = count_tornadoes(rows)
+
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ counts | 'write' >> beam.io.Write(
+ beam.io.BigQuerySink(
+ known_args.output,
+ schema='month:INTEGER, tornado_count:INTEGER',
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
+
+ # Run the pipeline (all operations are deferred until run() is called).
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index c926df8..45dcaba 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -30,16 +30,15 @@ from apache_beam.testing.util import equal_to
class BigQueryTornadoesTest(unittest.TestCase):
def test_basics(self):
- p = TestPipeline()
- rows = (p | 'create' >> beam.Create([
- {'month': 1, 'day': 1, 'tornado': False},
- {'month': 1, 'day': 2, 'tornado': True},
- {'month': 1, 'day': 3, 'tornado': True},
- {'month': 2, 'day': 1, 'tornado': True}]))
- results = bigquery_tornadoes.count_tornadoes(rows)
- assert_that(results, equal_to([{'month': 1, 'tornado_count': 2},
- {'month': 2, 'tornado_count': 1}]))
- p.run().wait_until_finish()
+ with TestPipeline() as p:
+ rows = (p | 'create' >> beam.Create([
+ {'month': 1, 'day': 1, 'tornado': False},
+ {'month': 1, 'day': 2, 'tornado': True},
+ {'month': 1, 'day': 3, 'tornado': True},
+ {'month': 2, 'day': 1, 'tornado': True}]))
+ results = bigquery_tornadoes.count_tornadoes(rows)
+ assert_that(results, equal_to([{'month': 1, 'tornado_count': 2},
+ {'month': 2, 'tornado_count': 1}]))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index aeeb3c9..f97b0f2 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -85,15 +85,13 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
- p = beam.Pipeline(argv=pipeline_args)
- (p # pylint: disable=expression-not-assigned
- | 'read' >> ReadFromText(known_args.input, coder=JsonCoder())
- | 'points' >> beam.FlatMap(compute_points)
- | beam.CombinePerKey(sum)
- | 'write' >> WriteToText(known_args.output, coder=JsonCoder()))
- p.run()
+
+ with beam.Pipeline(options=pipeline_options) as p:
+ (p # pylint: disable=expression-not-assigned
+ | 'read' >> ReadFromText(known_args.input, coder=JsonCoder())
+ | 'points' >> beam.FlatMap(compute_points)
+ | beam.CombinePerKey(sum)
+ | 'write' >> WriteToText(known_args.output, coder=JsonCoder()))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index f71dad8..988d3c9 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -35,13 +35,13 @@ class CodersTest(unittest.TestCase):
{'host': ['Brasil', 1], 'guest': ['Italy', 0]}]
def test_compute_points(self):
- p = TestPipeline()
- records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
- result = (records
- | 'points' >> beam.FlatMap(coders.compute_points)
- | beam.CombinePerKey(sum))
- assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)]))
- p.run()
+ with TestPipeline() as p:
+ records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
+ result = (records
+ | 'points' >> beam.FlatMap(coders.compute_points)
+ | beam.CombinePerKey(sum))
+ assert_that(result,
+ equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)]))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 609f2cd..aee69d2 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -47,11 +47,10 @@ class Count1(beam.PTransform):
def run_count1(known_args, options):
"""Runs the first example pipeline."""
logging.info('Running first pipeline')
- p = beam.Pipeline(options=options)
- (p | beam.io.ReadFromText(known_args.input)
- | Count1()
- | beam.io.WriteToText(known_args.output))
- p.run().wait_until_finish()
+ with beam.Pipeline(options=options) as p:
+ (p | beam.io.ReadFromText(known_args.input)
+ | Count1()
+ | beam.io.WriteToText(known_args.output))
@beam.ptransform_fn
@@ -66,11 +65,10 @@ def Count2(pcoll): # pylint: disable=invalid-name
def run_count2(known_args, options):
"""Runs the second example pipeline."""
logging.info('Running second pipeline')
- p = beam.Pipeline(options=options)
- (p | ReadFromText(known_args.input)
- | Count2() # pylint: disable=no-value-for-parameter
- | WriteToText(known_args.output))
- p.run().wait_until_finish()
+ with beam.Pipeline(options=options) as p:
+ (p | ReadFromText(known_args.input)
+ | Count2() # pylint: disable=no-value-for-parameter
+ | WriteToText(known_args.output))
@beam.ptransform_fn
@@ -93,11 +91,10 @@ def Count3(pcoll, factor=1): # pylint: disable=invalid-name
def run_count3(known_args, options):
"""Runs the third example pipeline."""
logging.info('Running third pipeline')
- p = beam.Pipeline(options=options)
- (p | ReadFromText(known_args.input)
- | Count3(2) # pylint: disable=no-value-for-parameter
- | WriteToText(known_args.output))
- p.run()
+ with beam.Pipeline(options=options) as p:
+ (p | ReadFromText(known_args.input)
+ | Count3(2) # pylint: disable=no-value-for-parameter
+ | WriteToText(known_args.output))
def get_args(argv):
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index c7c6dba..7aaccb4 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,12 +40,11 @@ class CustomCountTest(unittest.TestCase):
self.run_pipeline(custom_ptransform.Count3(factor), factor=factor)
def run_pipeline(self, count_implementation, factor=1):
- p = TestPipeline()
- words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
- result = words | count_implementation
- assert_that(
- result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
- p.run()
+ with TestPipeline() as p:
+ words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+ result = words | count_implementation
+ assert_that(
+ result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 411feb8..7161cff 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -135,18 +135,15 @@ class EntityWrapper(object):
def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
- p = beam.Pipeline(options=pipeline_options)
-
- # pylint: disable=expression-not-assigned
- (p
- | 'read' >> ReadFromText(user_options.input)
- | 'create entity' >> beam.Map(
- EntityWrapper(user_options.namespace, user_options.kind,
- user_options.ancestor).make_entity)
- | 'write to datastore' >> WriteToDatastore(project))
+ with beam.Pipeline(options=pipeline_options) as p:
- # Actually run the pipeline (all operations above are deferred).
- p.run().wait_until_finish()
+ # pylint: disable=expression-not-assigned
+ (p
+ | 'read' >> ReadFromText(user_options.input)
+ | 'create entity' >> beam.Map(
+ EntityWrapper(user_options.namespace, user_options.kind,
+ user_options.ancestor).make_entity)
+ | 'write to datastore' >> WriteToDatastore(project))
def make_ancestor_query(kind, namespace, ancestor):
@@ -196,7 +193,6 @@ def read_from_datastore(project, user_options, pipeline_options):
output | 'write' >> beam.io.WriteToText(file_path_prefix=user_options.output,
num_shards=user_options.num_shards)
- # Actually run the pipeline (all operations above are deferred).
result = p.run()
# Wait until completion, main thread would access post-completion job results.
result.wait_until_finish()
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index 374001c..1fbf763 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -86,20 +86,17 @@ def run(argv=None):
help='Numeric value of month to filter on.')
known_args, pipeline_args = parser.parse_known_args(argv)
- p = beam.Pipeline(argv=pipeline_args)
+ with beam.Pipeline(argv=pipeline_args) as p:
- input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input))
+ input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input))
- # pylint: disable=expression-not-assigned
- (filter_cold_days(input_data, known_args.month_filter)
- | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink(
- known_args.output,
- schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
-
- # Actually run the pipeline (all operations above are deferred).
- p.run()
+ # pylint: disable=expression-not-assigned
+ (filter_cold_days(input_data, known_args.month_filter)
+ | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink(
+ known_args.output,
+ schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 6bdadae..9c0d04b 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -95,28 +95,27 @@ def run(args=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
- # Register the custom coder for the Player class, so that it will be used in
- # the computation.
- coders.registry.register_coder(Player, PlayerCoder)
-
- (p # pylint: disable=expression-not-assigned
- | ReadFromText(known_args.input)
- # The get_players function is annotated with a type hint above, so the type
- # system knows the output type of the following operation is a key-value pair
- # of a Player and an int. Please see the documentation for details on
- # types that are inferred automatically as well as other ways to specify
- # type hints.
- | beam.Map(get_players)
- # The output type hint of the previous step is used to infer that the key
- # type of the following operation is the Player type. Since a custom coder
- # is registered for the Player class above, a PlayerCoder will be used to
- # encode Player objects as keys for this combine operation.
- | beam.CombinePerKey(sum)
- | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
- | WriteToText(known_args.output))
- return p.run()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ # Register the custom coder for the Player class, so that it will be used in
+ # the computation.
+ coders.registry.register_coder(Player, PlayerCoder)
+
+ (p # pylint: disable=expression-not-assigned
+ | ReadFromText(known_args.input)
+ # The get_players function is annotated with a type hint above, so the type
+ # system knows the output type of the following operation is a key-value
+ # pair of a Player and an int. Please see the documentation for details on
+ # types that are inferred automatically as well as other ways to specify
+ # type hints.
+ | beam.Map(get_players)
+ # The output type hint of the previous step is used to infer that the key
+ # type of the following operation is the Player type. Since a custom coder
+ # is registered for the Player class above, a PlayerCoder will be used to
+ # encode Player objects as keys for this combine operation.
+ | beam.CombinePerKey(sum)
+ | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
+ | WriteToText(known_args.output))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index 4e87966..268ba8d 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -50,7 +50,7 @@ class GroupWithCoderTest(unittest.TestCase):
temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
group_with_coder.run([
'--input=%s*' % temp_path,
- '--output=%s.result' % temp_path]).wait_until_finish()
+ '--output=%s.result' % temp_path])
# Parse result file and compare.
results = []
with open(temp_path + '.result-00000-of-00001') as result_file:
@@ -71,7 +71,7 @@ class GroupWithCoderTest(unittest.TestCase):
group_with_coder.run([
'--no_pipeline_type_check',
'--input=%s*' % temp_path,
- '--output=%s.result' % temp_path]).wait_until_finish()
+ '--output=%s.result' % temp_path])
# Parse result file and compare.
results = []
with open(temp_path + '.result-00000-of-00001') as result_file:
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 4f53c61..9acdd90 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -70,64 +70,63 @@ def run(argv=None, assert_results=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
- # Helper: read a tab-separated key-value mapping from a text file, escape all
- # quotes/backslashes, and convert it a PCollection of (key, value) pairs.
- def read_kv_textfile(label, textfile):
- return (p
- | 'Read: %s' % label >> ReadFromText(textfile)
- | 'Backslash: %s' % label >> beam.Map(
- lambda x: re.sub(r'\\', r'\\\\', x))
- | 'EscapeQuotes: %s' % label >> beam.Map(
- lambda x: re.sub(r'"', r'\"', x))
- | 'Split: %s' % label >> beam.Map(
- lambda x: re.split(r'\t+', x, 1)))
-
- # Read input databases.
- email = read_kv_textfile('email', known_args.input_email)
- phone = read_kv_textfile('phone', known_args.input_phone)
- snailmail = read_kv_textfile('snailmail', known_args.input_snailmail)
-
- # Group together all entries under the same name.
- grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
-
- # Prepare tab-delimited output; something like this:
- # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
- tsv_lines = grouped | beam.Map(
- lambda (name, (email, phone, snailmail)): '\t'.join(
- ['"%s"' % name,
- '"%s"' % ','.join(email),
- '"%s"' % ','.join(phone),
- '"%s"' % next(iter(snailmail), '')]))
-
- # Compute some stats about our database of people.
- luddites = grouped | beam.Filter( # People without email.
- lambda (name, (email, phone, snailmail)): not next(iter(email), None))
- writers = grouped | beam.Filter( # People without phones.
- lambda (name, (email, phone, snailmail)): not next(iter(phone), None))
- nomads = grouped | beam.Filter( # People without addresses.
- lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
-
- num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
- num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
- num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally()
-
- # Write tab-delimited output.
- # pylint: disable=expression-not-assigned
- tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv)
-
- # TODO(silviuc): Move the assert_results logic to the unit test.
- if assert_results is not None:
- expected_luddites, expected_writers, expected_nomads = assert_results
- assert_that(num_luddites, equal_to([expected_luddites]),
- label='assert:luddites')
- assert_that(num_writers, equal_to([expected_writers]),
- label='assert:writers')
- assert_that(num_nomads, equal_to([expected_nomads]),
- label='assert:nomads')
- # Execute pipeline.
- return p.run()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ # Helper: read a tab-separated key-value mapping from a text file,
+ # escape all quotes/backslashes, and convert it a PCollection of
+ # (key, value) pairs.
+ def read_kv_textfile(label, textfile):
+ return (p
+ | 'Read: %s' % label >> ReadFromText(textfile)
+ | 'Backslash: %s' % label >> beam.Map(
+ lambda x: re.sub(r'\\', r'\\\\', x))
+ | 'EscapeQuotes: %s' % label >> beam.Map(
+ lambda x: re.sub(r'"', r'\"', x))
+ | 'Split: %s' % label >> beam.Map(
+ lambda x: re.split(r'\t+', x, 1)))
+
+ # Read input databases.
+ email = read_kv_textfile('email', known_args.input_email)
+ phone = read_kv_textfile('phone', known_args.input_phone)
+ snailmail = read_kv_textfile('snailmail', known_args.input_snailmail)
+
+ # Group together all entries under the same name.
+ grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
+
+ # Prepare tab-delimited output; something like this:
+ # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
+ tsv_lines = grouped | beam.Map(
+ lambda (name, (email, phone, snailmail)): '\t'.join(
+ ['"%s"' % name,
+ '"%s"' % ','.join(email),
+ '"%s"' % ','.join(phone),
+ '"%s"' % next(iter(snailmail), '')]))
+
+ # Compute some stats about our database of people.
+ luddites = grouped | beam.Filter( # People without email.
+ lambda (name, (email, phone, snailmail)): not next(iter(email), None))
+ writers = grouped | beam.Filter( # People without phones.
+ lambda (name, (email, phone, snailmail)): not next(iter(phone), None))
+ nomads = grouped | beam.Filter( # People without addresses.
+ lambda (name, (e, p, snailmail)): not next(iter(snailmail), None))
+
+ num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
+ num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
+ num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally()
+
+ # Write tab-delimited output.
+ # pylint: disable=expression-not-assigned
+ tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv)
+
+ # TODO(silviuc): Move the assert_results logic to the unit test.
+ if assert_results is not None:
+ expected_luddites, expected_writers, expected_nomads = assert_results
+ assert_that(num_luddites, equal_to([expected_luddites]),
+ label='assert:luddites')
+ assert_that(num_writers, equal_to([expected_writers]),
+ label='assert:writers')
+ assert_that(num_nomads, equal_to([expected_nomads]),
+ label='assert:nomads')
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
index 09f71d3..b3be0dd 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
@@ -107,13 +107,12 @@ class MergeContactsTest(unittest.TestCase):
result_prefix = self.create_temp_file('')
- result = mergecontacts.run([
+ mergecontacts.run([
'--input_email=%s' % path_email,
'--input_phone=%s' % path_phone,
'--input_snailmail=%s' % path_snailmail,
'--output_tsv=%s.tsv' % result_prefix,
'--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3))
- result.wait_until_finish()
with open('%s.tsv-00000-of-00001' % result_prefix) as f:
contents = f.read()
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 9759f48..2316c66 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -141,43 +141,41 @@ def run(argv=None):
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
- lines = p | ReadFromText(known_args.input)
-
- # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
- split_lines_result = (lines
- | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
- SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
- SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
- main='words'))
-
- # split_lines_result is an object of type DoOutputsTuple. It supports
- # accessing result in alternative ways.
- words, _, _ = split_lines_result
- short_words = split_lines_result[
- SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
- character_count = split_lines_result.tag_character_count
-
- # pylint: disable=expression-not-assigned
- (character_count
- | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
- | beam.GroupByKey()
- | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
- | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
- # pylint: disable=expression-not-assigned
- (short_words
- | 'count short words' >> CountWords()
- | 'write short words' >> WriteToText(
- known_args.output + '-short-words'))
-
- # pylint: disable=expression-not-assigned
- (words
- | 'count words' >> CountWords()
- | 'write words' >> WriteToText(known_args.output + '-words'))
-
- return p.run()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ lines = p | ReadFromText(known_args.input)
+
+ # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+ split_lines_result = (lines
+ | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+ SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+ main='words'))
+
+ # split_lines_result is an object of type DoOutputsTuple. It supports
+ # accessing result in alternative ways.
+ words, _, _ = split_lines_result
+ short_words = split_lines_result[
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+ character_count = split_lines_result.tag_character_count
+
+ # pylint: disable=expression-not-assigned
+ (character_count
+ | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+ | beam.GroupByKey()
+ | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
+ | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+ # pylint: disable=expression-not-assigned
+ (short_words
+ | 'count short words' >> CountWords()
+ | 'write short words' >> WriteToText(
+ known_args.output + '-short-words'))
+
+ # pylint: disable=expression-not-assigned
+ (words
+ | 'count words' >> CountWords()
+ | 'write words' >> WriteToText(known_args.output + '-words'))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
index 2c9111c..3ddd668 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
@@ -52,7 +52,7 @@ class MultipleOutputParDo(unittest.TestCase):
multiple_output_pardo.run([
'--input=%s*' % temp_path,
- '--output=%s' % result_prefix]).wait_until_finish()
+ '--output=%s' % result_prefix])
expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n')))
with open(result_prefix + '-chars-00000-of-00001') as f:
http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 7259572..70929e9 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -147,18 +147,15 @@ def model_pipelines(argv):
pipeline_options = PipelineOptions(argv)
my_options = pipeline_options.view_as(MyOptions)
- p = beam.Pipeline(options=pipeline_options)
-
- (p
- | beam.io.ReadFromText(my_options.input)
- | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- | beam.Map(lambda x: (x, 1))
- | beam.combiners.Count.PerKey()
- | beam.io.WriteToText(my_options.output))
-
- result = p.run()
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ (p
+ | beam.io.ReadFromText(my_options.input)
+ | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+ | beam.Map(lambda x: (x, 1))
+ | beam.combiners.Count.PerKey()
+ | beam.io.WriteToText(my_options.output))
# [END model_pipelines]
- result.wait_until_finish()
def model_pcollection(argv):
@@ -178,21 +175,18 @@ def model_pcollection(argv):
my_options = pipeline_options.view_as(MyOptions)
# [START model_pcollection]
- p = beam.Pipeline(options=pipeline_options)
+ with beam.Pipeline(options=pipeline_options) as p:
- lines = (p
- | beam.Create([
- 'To be, or not to be: that is the question: ',
- 'Whether \'tis nobler in the mind to suffer ',
- 'The slings and arrows of outrageous fortune, ',
- 'Or to take arms against a sea of troubles, ']))
- # [END model_pcollection]
+ lines = (p
+ | beam.Create([
+ 'To be, or not to be: that is the question: ',
+ 'Whether \'tis nobler in the mind to suffer ',
+ 'The slings and arrows of outrageous fortune, ',
+ 'Or to take arms against a sea of troubles, ']))
+ # [END model_pcollection]
- (lines
- | beam.io.WriteToText(my_options.output))
-
- result = p.run()
- result.wait_until_finish()
+ (lines
+ | beam.io.WriteToText(my_options.output))
def pipeline_options_remote(argv):
@@ -297,12 +291,10 @@ def pipeline_options_command_line(argv):
known_args, pipeline_args = parser.parse_known_args(argv)
# Create the Pipeline with remaining arguments.
- p = beam.Pipeline(argv=pipeline_args)
- lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
- lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
- # [END pipeline_options_command_line]
-
- p.run().wait_until_finish()
+ with beam.Pipeline(argv=pipeline_args) as p:
+ lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
+ lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
+ # [END pipeline_options_command_line]
def pipeline_logging(lines, output):
@@ -329,13 +321,11 @@ def pipeline_logging(lines, output):
# Remaining WordCount example code ...
# [END pipeline_logging]
- p = TestPipeline() # Use TestPipeline for testing.
- (p
- | beam.Create(lines)
- | beam.ParDo(ExtractWordsFn())
- | beam.io.WriteToText(output))
-
- p.run()
+ with TestPipeline() as p: # Use TestPipeline for testing.
+ (p
+ | beam.Create(lines)
+ | beam.ParDo(ExtractWordsFn())
+ | beam.io.WriteToText(output))
def pipeline_monitoring(renames):
@@ -385,20 +375,19 @@ def pipeline_monitoring(renames):
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(WordCountOptions)
- p = TestPipeline() # Use TestPipeline for testing.
+ with TestPipeline() as p: # Use TestPipeline for testing.
- # [START pipeline_monitoring_execution]
- (p
- # Read the lines of the input text.
- | 'ReadLines' >> beam.io.ReadFromText(options.input)
- # Count the words.
- | CountWords()
- # Write the formatted word counts to output.
- | 'WriteCounts' >> beam.io.WriteToText(options.output))
- # [END pipeline_monitoring_execution]
+ # [START pipeline_monitoring_execution]
+ (p
+ # Read the lines of the input text.
+ | 'ReadLines' >> beam.io.ReadFromText(options.input)
+ # Count the words.
+ | CountWords()
+ # Write the formatted word counts to output.
+ | 'WriteCounts' >> beam.io.WriteToText(options.output))
+ # [END pipeline_monitoring_execution]
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
+ p.visit(SnippetUtils.RenameFiles(renames))
def examples_wordcount_minimal(renames):
@@ -478,40 +467,39 @@ def examples_wordcount_wordcount(renames):
default='gs://my-bucket/input')
options = PipelineOptions(argv)
- p = beam.Pipeline(options=options)
- # [END examples_wordcount_wordcount_options]
+ with beam.Pipeline(options=options) as p:
+ # [END examples_wordcount_wordcount_options]
- lines = p | beam.io.ReadFromText(
- 'gs://dataflow-samples/shakespeare/kinglear.txt')
+ lines = p | beam.io.ReadFromText(
+ 'gs://dataflow-samples/shakespeare/kinglear.txt')
- # [START examples_wordcount_wordcount_composite]
- class CountWords(beam.PTransform):
+ # [START examples_wordcount_wordcount_composite]
+ class CountWords(beam.PTransform):
- def expand(self, pcoll):
- return (pcoll
- # Convert lines of text into individual words.
- | 'ExtractWords' >> beam.FlatMap(
- lambda x: re.findall(r'[A-Za-z\']+', x))
+ def expand(self, pcoll):
+ return (pcoll
+ # Convert lines of text into individual words.
+ | 'ExtractWords' >> beam.FlatMap(
+ lambda x: re.findall(r'[A-Za-z\']+', x))
- # Count the number of times each word occurs.
- | beam.combiners.Count.PerElement())
+ # Count the number of times each word occurs.
+ | beam.combiners.Count.PerElement())
- counts = lines | CountWords()
- # [END examples_wordcount_wordcount_composite]
+ counts = lines | CountWords()
+ # [END examples_wordcount_wordcount_composite]
- # [START examples_wordcount_wordcount_dofn]
- class FormatAsTextFn(beam.DoFn):
+ # [START examples_wordcount_wordcount_dofn]
+ class FormatAsTextFn(beam.DoFn):
- def process(self, element):
- word, count = element
- yield '%s: %s' % (word, count)
+ def process(self, element):
+ word, count = element
+ yield '%s: %s' % (word, count)
- formatted = counts | beam.ParDo(FormatAsTextFn())
- # [END examples_wordcount_wordcount_dofn]
+ formatted = counts | beam.ParDo(FormatAsTextFn())
+ # [END examples_wordcount_wordcount_dofn]
- formatted | beam.io.WriteToText('gs://my-bucket/counts.txt')
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run().wait_until_finish()
+ formatted | beam.io.WriteToText('gs://my-bucket/counts.txt')
+ p.visit(SnippetUtils.RenameFiles(renames))
def examples_wordcount_debugging(renames):
@@ -558,27 +546,27 @@ def examples_wordcount_debugging(renames):
# [END example_wordcount_debugging_logging]
# [END example_wordcount_debugging_aggregators]
- p = TestPipeline() # Use TestPipeline for testing.
- filtered_words = (
- p
- | beam.io.ReadFromText(
- 'gs://dataflow-samples/shakespeare/kinglear.txt')
- | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- | beam.combiners.Count.PerElement()
- | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
+ with TestPipeline() as p: # Use TestPipeline for testing.
+ filtered_words = (
+ p
+ | beam.io.ReadFromText(
+ 'gs://dataflow-samples/shakespeare/kinglear.txt')
+ | 'ExtractWords' >> beam.FlatMap(
+ lambda x: re.findall(r'[A-Za-z\']+', x))
+ | beam.combiners.Count.PerElement()
+ | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
- # [START example_wordcount_debugging_assert]
- beam.testing.util.assert_that(
- filtered_words, beam.testing.util.equal_to(
- [('Flourish', 3), ('stomach', 1)]))
- # [END example_wordcount_debugging_assert]
+ # [START example_wordcount_debugging_assert]
+ beam.testing.util.assert_that(
+ filtered_words, beam.testing.util.equal_to(
+ [('Flourish', 3), ('stomach', 1)]))
+ # [END example_wordcount_debugging_assert]
- output = (filtered_words
- | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
- | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
+ output = (filtered_words
+ | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run()
+ p.visit(SnippetUtils.RenameFiles(renames))
import apache_beam as beam
@@ -659,16 +647,14 @@ def model_custom_source(count):
# Using the source in an example pipeline.
# [START model_custom_source_use_new_source]
- p = beam.Pipeline(options=PipelineOptions())
- numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
- # [END model_custom_source_use_new_source]
+ with beam.Pipeline(options=PipelineOptions()) as p:
+ numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
+ # [END model_custom_source_use_new_source]
- lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
- assert_that(
- lines, equal_to(
- ['line ' + str(number) for number in range(0, count)]))
-
- p.run().wait_until_finish()
+ lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
+ assert_that(
+ lines, equal_to(
+ ['line ' + str(number) for number in range(0, count)]))
# We recommend users to start Source classes with an underscore to discourage
# using the Source class directly when a PTransform for the source is
@@ -796,14 +782,12 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
# Using the new sink in an example pipeline.
# [START model_custom_sink_use_new_sink]
- p = beam.Pipeline(options=PipelineOptions())
- kvs = p | 'CreateKVs' >> beam.Create(KVs)
+ with beam.Pipeline(options=PipelineOptions()) as p:
+ kvs = p | 'CreateKVs' >> beam.Create(KVs)
- kvs | 'WriteToSimpleKV' >> beam.io.Write(
- SimpleKVSink('http://url_to_simple_kv/', final_table_name))
- # [END model_custom_sink_use_new_sink]
-
- p.run().wait_until_finish()
+ kvs | 'WriteToSimpleKV' >> beam.io.Write(
+ SimpleKVSink('http://url_to_simple_kv/', final_table_name))
+ # [END model_custom_sink_use_new_sink]
# We recommend users to start Sink class names with an underscore to
# discourage using the Sink class directly when a PTransform for the sink is
@@ -828,13 +812,11 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
final_table_name = final_table_name_with_ptransform
# [START model_custom_sink_use_ptransform]
- p = beam.Pipeline(options=PipelineOptions())
- kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
- kvs | 'WriteToSimpleKV' >> WriteToKVSink(
- 'http://url_to_simple_kv/', final_table_name)
- # [END model_custom_sink_use_ptransform]
-
- p.run().wait_until_finish()
+ with beam.Pipeline(options=PipelineOptions()) as p:
+ kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
+ kvs | 'WriteToSimpleKV' >> WriteToKVSink(
+ 'http://url_to_simple_kv/', final_table_name)
+ # [END model_custom_sink_use_ptransform]
def model_textio(renames):
@@ -847,37 +829,35 @@ def model_textio(renames):
from apache_beam.options.pipeline_options import PipelineOptions
# [START model_textio_read]
- p = beam.Pipeline(options=PipelineOptions())
- # [START model_pipelineio_read]
- lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')
- # [END model_pipelineio_read]
- # [END model_textio_read]
-
- # [START model_textio_write]
- filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
- # [START model_pipelineio_write]
- filtered_words | 'WriteToText' >> beam.io.WriteToText(
- '/path/to/numbers', file_name_suffix='.csv')
- # [END model_pipelineio_write]
- # [END model_textio_write]
+ with beam.Pipeline(options=PipelineOptions()) as p:
+ # [START model_pipelineio_read]
+ lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')
+ # [END model_pipelineio_read]
+ # [END model_textio_read]
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run().wait_until_finish()
+ # [START model_textio_write]
+ filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
+ # [START model_pipelineio_write]
+ filtered_words | 'WriteToText' >> beam.io.WriteToText(
+ '/path/to/numbers', file_name_suffix='.csv')
+ # [END model_pipelineio_write]
+ # [END model_textio_write]
+
+ p.visit(SnippetUtils.RenameFiles(renames))
def model_textio_compressed(renames, expected):
"""Using a Read Transform to read compressed text files."""
- p = TestPipeline()
+ with TestPipeline() as p:
- # [START model_textio_write_compressed]
- lines = p | 'ReadFromText' >> beam.io.ReadFromText(
- '/path/to/input-*.csv.gz',
- compression_type=beam.io.filesystem.CompressionTypes.GZIP)
- # [END model_textio_write_compressed]
+ # [START model_textio_write_compressed]
+ lines = p | 'ReadFromText' >> beam.io.ReadFromText(
+ '/path/to/input-*.csv.gz',
+ compression_type=beam.io.filesystem.CompressionTypes.GZIP)
+ # [END model_textio_write_compressed]
- assert_that(lines, equal_to(expected))
- p.visit(SnippetUtils.RenameFiles(renames))
- p.run().wait_until_finish()
+ assert_that(lines, equal_to(expected))
+ p.visit(SnippetUtils.RenameFiles(renames))
def model_datastoreio():
@@ -987,43 +967,40 @@ def model_composite_transform_example(contents, output_path):
# [END composite_ptransform_apply_method]
# [END composite_transform_example]
- p = TestPipeline() # Use TestPipeline for testing.
- (p
- | beam.Create(contents)
- | CountWords()
- | beam.io.WriteToText(output_path))
- p.run()
+ with TestPipeline() as p: # Use TestPipeline for testing.
+ (p
+ | beam.Create(contents)
+ | CountWords()
+ | beam.io.WriteToText(output_path))
def model_multiple_pcollections_flatten(contents, output_path):
"""Merging a PCollection with Flatten."""
some_hash_fn = lambda s: ord(s[0])
- import apache_beam as beam
- p = TestPipeline() # Use TestPipeline for testing.
partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
-
- # Partition into deciles
- partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3)
- pcoll1 = partitioned[0]
- pcoll2 = partitioned[1]
- pcoll3 = partitioned[2]
-
- # Flatten them back into 1
-
- # A collection of PCollection objects can be represented simply
- # as a tuple (or list) of PCollections.
- # (The SDK for Python has no separate type to store multiple
- # PCollection objects, whether containing the same or different
- # types.)
- # [START model_multiple_pcollections_flatten]
- merged = (
- (pcoll1, pcoll2, pcoll3)
- # A list of tuples can be "piped" directly into a Flatten transform.
- | beam.Flatten())
- # [END model_multiple_pcollections_flatten]
- merged | beam.io.WriteToText(output_path)
-
- p.run()
+ import apache_beam as beam
+ with TestPipeline() as p: # Use TestPipeline for testing.
+
+ # Partition into deciles
+ partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3)
+ pcoll1 = partitioned[0]
+ pcoll2 = partitioned[1]
+ pcoll3 = partitioned[2]
+
+ # Flatten them back into 1
+
+ # A collection of PCollection objects can be represented simply
+ # as a tuple (or list) of PCollections.
+ # (The SDK for Python has no separate type to store multiple
+ # PCollection objects, whether containing the same or different
+ # types.)
+ # [START model_multiple_pcollections_flatten]
+ merged = (
+ (pcoll1, pcoll2, pcoll3)
+ # A list of tuples can be "piped" directly into a Flatten transform.
+ | beam.Flatten())
+ # [END model_multiple_pcollections_flatten]
+ merged | beam.io.WriteToText(output_path)
def model_multiple_pcollections_partition(contents, output_path):
@@ -1034,25 +1011,23 @@ def model_multiple_pcollections_partition(contents, output_path):
"""Assume i in [0,100)."""
return i
import apache_beam as beam
- p = TestPipeline() # Use TestPipeline for testing.
+ with TestPipeline() as p: # Use TestPipeline for testing.
- students = p | beam.Create(contents)
+ students = p | beam.Create(contents)
- # [START model_multiple_pcollections_partition]
- def partition_fn(student, num_partitions):
- return int(get_percentile(student) * num_partitions / 100)
+ # [START model_multiple_pcollections_partition]
+ def partition_fn(student, num_partitions):
+ return int(get_percentile(student) * num_partitions / 100)
- by_decile = students | beam.Partition(partition_fn, 10)
- # [END model_multiple_pcollections_partition]
- # [START model_multiple_pcollections_partition_40th]
- fortieth_percentile = by_decile[4]
- # [END model_multiple_pcollections_partition_40th]
+ by_decile = students | beam.Partition(partition_fn, 10)
+ # [END model_multiple_pcollections_partition]
+ # [START model_multiple_pcollections_partition_40th]
+ fortieth_percentile = by_decile[4]
+ # [END model_multiple_pcollections_partition_40th]
- ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
- | beam.Flatten()
- | beam.io.WriteToText(output_path))
-
- p.run()
+ ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
+ | beam.Flatten()
+ | beam.io.WriteToText(output_path))
def model_group_by_key(contents, output_path):
@@ -1060,58 +1035,56 @@ def model_group_by_key(contents, output_path):
import re
import apache_beam as beam
- p = TestPipeline() # Use TestPipeline for testing.
- words_and_counts = (
- p
- | beam.Create(contents)
- | beam.FlatMap(lambda x: re.findall(r'\w+', x))
- | 'one word' >> beam.Map(lambda w: (w, 1)))
- # GroupByKey accepts a PCollection of (w, 1) and
- # outputs a PCollection of (w, (1, 1, ...)).
- # (A key/value pair is just a tuple in Python.)
- # This is a somewhat forced example, since one could
- # simply use beam.combiners.Count.PerElement here.
- # [START model_group_by_key_transform]
- grouped_words = words_and_counts | beam.GroupByKey()
- # [END model_group_by_key_transform]
- (grouped_words
- | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
- | beam.io.WriteToText(output_path))
- p.run()
+ with TestPipeline() as p: # Use TestPipeline for testing.
+ words_and_counts = (
+ p
+ | beam.Create(contents)
+ | beam.FlatMap(lambda x: re.findall(r'\w+', x))
+ | 'one word' >> beam.Map(lambda w: (w, 1)))
+ # GroupByKey accepts a PCollection of (w, 1) and
+ # outputs a PCollection of (w, (1, 1, ...)).
+ # (A key/value pair is just a tuple in Python.)
+ # This is a somewhat forced example, since one could
+ # simply use beam.combiners.Count.PerElement here.
+ # [START model_group_by_key_transform]
+ grouped_words = words_and_counts | beam.GroupByKey()
+ # [END model_group_by_key_transform]
+ (grouped_words
+ | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
+ | beam.io.WriteToText(output_path))
def model_co_group_by_key_tuple(email_list, phone_list, output_path):
"""Applying a CoGroupByKey Transform to a tuple."""
import apache_beam as beam
- p = TestPipeline() # Use TestPipeline for testing.
- # [START model_group_by_key_cogroupbykey_tuple]
- # Each data set is represented by key-value pairs in separate PCollections.
- # Both data sets share a common key type (in this example str).
- # The email_list contains values such as: ('joe', 'joe@example.com') with
- # multiple possible values for each key.
- # The phone_list contains values such as: ('mary': '111-222-3333') with
- # multiple possible values for each key.
- emails = p | 'email' >> beam.Create(email_list)
- phones = p | 'phone' >> beam.Create(phone_list)
- # The result PCollection contains one key-value element for each key in the
- # input PCollections. The key of the pair will be the key from the input and
- # the value will be a dictionary with two entries: 'emails' - an iterable of
- # all values for the current key in the emails PCollection and 'phones': an
- # iterable of all values for the current key in the phones PCollection.
- # For instance, if 'emails' contained ('joe', 'joe@example.com') and
- # ('joe', 'joe@gmail.com'), then 'result' will contain the element
- # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...})
- result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey()
-
- def join_info((name, info)):
- return '; '.join(['%s' % name,
- '%s' % ','.join(info['emails']),
- '%s' % ','.join(info['phones'])])
-
- contact_lines = result | beam.Map(join_info)
- # [END model_group_by_key_cogroupbykey_tuple]
- contact_lines | beam.io.WriteToText(output_path)
- p.run()
+ with TestPipeline() as p: # Use TestPipeline for testing.
+ # [START model_group_by_key_cogroupbykey_tuple]
+ # Each data set is represented by key-value pairs in separate PCollections.
+ # Both data sets share a common key type (in this example str).
+ # The email_list contains values such as: ('joe', 'joe@example.com') with
+ # multiple possible values for each key.
+ # The phone_list contains values such as: ('mary': '111-222-3333') with
+ # multiple possible values for each key.
+ emails = p | 'email' >> beam.Create(email_list)
+ phones = p | 'phone' >> beam.Create(phone_list)
+ # The result PCollection contains one key-value element for each key in the
+ # input PCollections. The key of the pair will be the key from the input and
+ # the value will be a dictionary with two entries: 'emails' - an iterable of
+ # all values for the current key in the emails PCollection and 'phones': an
+ # iterable of all values for the current key in the phones PCollection.
+ # For instance, if 'emails' contained ('joe', 'joe@example.com') and
+ # ('joe', 'joe@gmail.com'), then 'result' will contain the element
+ # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...})
+ result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey()
+
+ def join_info((name, info)):
+ return '; '.join(['%s' % name,
+ '%s' % ','.join(info['emails']),
+ '%s' % ','.join(info['phones'])])
+
+ contact_lines = result | beam.Map(join_info)
+ # [END model_group_by_key_cogroupbykey_tuple]
+ contact_lines | beam.io.WriteToText(output_path)
def model_join_using_side_inputs(
@@ -1121,35 +1094,34 @@ def model_join_using_side_inputs(
import apache_beam as beam
from apache_beam.pvalue import AsIter
- p = TestPipeline() # Use TestPipeline for testing.
- # [START model_join_using_side_inputs]
- # This code performs a join by receiving the set of names as an input and
- # passing PCollections that contain emails and phone numbers as side inputs
- # instead of using CoGroupByKey.
- names = p | 'names' >> beam.Create(name_list)
- emails = p | 'email' >> beam.Create(email_list)
- phones = p | 'phone' >> beam.Create(phone_list)
-
- def join_info(name, emails, phone_numbers):
- filtered_emails = []
- for name_in_list, email in emails:
- if name_in_list == name:
- filtered_emails.append(email)
-
- filtered_phone_numbers = []
- for name_in_list, phone_number in phone_numbers:
- if name_in_list == name:
- filtered_phone_numbers.append(phone_number)
-
- return '; '.join(['%s' % name,
- '%s' % ','.join(filtered_emails),
- '%s' % ','.join(filtered_phone_numbers)])
-
- contact_lines = names | 'CreateContacts' >> beam.core.Map(
- join_info, AsIter(emails), AsIter(phones))
- # [END model_join_using_side_inputs]
- contact_lines | beam.io.WriteToText(output_path)
- p.run()
+ with TestPipeline() as p: # Use TestPipeline for testing.
+ # [START model_join_using_side_inputs]
+ # This code performs a join by receiving the set of names as an input and
+ # passing PCollections that contain emails and phone numbers as side inputs
+ # instead of using CoGroupByKey.
+ names = p | 'names' >> beam.Create(name_list)
+ emails = p | 'email' >> beam.Create(email_list)
+ phones = p | 'phone' >> beam.Create(phone_list)
+
+ def join_info(name, emails, phone_numbers):
+ filtered_emails = []
+ for name_in_list, email in emails:
+ if name_in_list == name:
+ filtered_emails.append(email)
+
+ filtered_phone_numbers = []
+ for name_in_list, phone_number in phone_numbers:
+ if name_in_list == name:
+ filtered_phone_numbers.append(phone_number)
+
+ return '; '.join(['%s' % name,
+ '%s' % ','.join(filtered_emails),
+ '%s' % ','.join(filtered_phone_numbers)])
+
+ contact_lines = names | 'CreateContacts' >> beam.core.Map(
+ join_info, AsIter(emails), AsIter(phones))
+ # [END model_join_using_side_inputs]
+ contact_lines | beam.io.WriteToText(output_path)
# [START model_library_transforms_keys]
[3/3] beam git commit: Closes #3180
Posted by ro...@apache.org.
Closes #3180
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/474345f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/474345f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/474345f5
Branch: refs/heads/master
Commit: 474345f5987e47a22d063c7bfcb3638c85a57e64
Parents: d0601b3 3a62b4f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue May 23 10:19:22 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 23 10:19:22 2017 -0700
----------------------------------------------------------------------
.../examples/complete/autocomplete.py | 19 +-
.../examples/complete/autocomplete_test.py | 31 +-
.../examples/complete/estimate_pi.py | 11 +-
.../examples/complete/estimate_pi_test.py | 12 +-
.../examples/complete/game/hourly_team_score.py | 19 +-
.../examples/complete/game/user_score.py | 15 +-
.../complete/juliaset/juliaset/juliaset.py | 44 +-
.../apache_beam/examples/complete/tfidf.py | 21 +-
.../apache_beam/examples/complete/tfidf_test.py | 28 +-
.../examples/complete/top_wikipedia_sessions.py | 12 +-
.../complete/top_wikipedia_sessions_test.py | 9 +-
.../examples/cookbook/bigquery_schema.py | 159 +++---
.../examples/cookbook/bigquery_side_input.py | 51 +-
.../cookbook/bigquery_side_input_test.py | 39 +-
.../examples/cookbook/bigquery_tornadoes.py | 33 +-
.../cookbook/bigquery_tornadoes_test.py | 19 +-
.../apache_beam/examples/cookbook/coders.py | 16 +-
.../examples/cookbook/coders_test.py | 14 +-
.../examples/cookbook/custom_ptransform.py | 27 +-
.../examples/cookbook/custom_ptransform_test.py | 11 +-
.../examples/cookbook/datastore_wordcount.py | 20 +-
.../apache_beam/examples/cookbook/filters.py | 21 +-
.../examples/cookbook/group_with_coder.py | 43 +-
.../examples/cookbook/group_with_coder_test.py | 4 +-
.../examples/cookbook/mergecontacts.py | 115 +++--
.../examples/cookbook/mergecontacts_test.py | 3 +-
.../examples/cookbook/multiple_output_pardo.py | 72 ++-
.../cookbook/multiple_output_pardo_test.py | 2 +-
.../apache_beam/examples/snippets/snippets.py | 494 +++++++++----------
.../examples/snippets/snippets_test.py | 326 ++++++------
.../apache_beam/examples/streaming_wordcap.py | 24 +-
.../apache_beam/examples/streaming_wordcount.py | 44 +-
sdks/python/apache_beam/examples/wordcount.py | 1 -
.../apache_beam/examples/wordcount_debugging.py | 55 +--
.../apache_beam/examples/wordcount_minimal.py | 33 +-
.../python/apache_beam/io/filebasedsink_test.py | 16 +-
sdks/python/apache_beam/pipeline.py | 19 +-
.../apache_beam/transforms/combiners_test.py | 58 +--
.../apache_beam/transforms/window_test.py | 147 +++---
.../transforms/write_ptransform_test.py | 7 +-
.../typehints/typed_pipeline_test.py | 22 +-
41 files changed, 1005 insertions(+), 1111 deletions(-)
----------------------------------------------------------------------