You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/23 17:19:58 UTC

[1/3] beam git commit: Automatically convert examples to use with syntax.

Repository: beam
Updated Branches:
  refs/heads/master d0601b30c -> 474345f59


http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index f7b51a7..6654fef 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -119,7 +119,6 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({'A', 'C'}, set(all_capitals))
 
   def test_pardo_with_label(self):
-    # pylint: disable=line-too-long
     words = ['aa', 'bbc', 'defg']
     # [START model_pardo_with_label]
     result = words | 'CountUniqueLetters' >> beam.Map(
@@ -129,41 +128,41 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({1, 2, 4}, set(result))
 
   def test_pardo_side_input(self):
-    p = TestPipeline()
-    words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
-
-    # [START model_pardo_side_input]
-    # Callable takes additional arguments.
-    def filter_using_length(word, lower_bound, upper_bound=float('inf')):
-      if lower_bound <= len(word) <= upper_bound:
-        yield word
-
-    # Construct a deferred side input.
-    avg_word_len = (words
-                    | beam.Map(len)
-                    | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
-
-    # Call with explicit side inputs.
-    small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
-
-    # A single deferred side input.
-    larger_than_average = (words | 'large' >> beam.FlatMap(
-        filter_using_length,
-        lower_bound=pvalue.AsSingleton(avg_word_len)))
-
-    # Mix and match.
-    small_but_nontrivial = words | beam.FlatMap(filter_using_length,
-                                                lower_bound=2,
-                                                upper_bound=pvalue.AsSingleton(
-                                                    avg_word_len))
-    # [END model_pardo_side_input]
-
-    assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
-    assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
-                label='larger_than_average')
-    assert_that(small_but_nontrivial, equal_to(['bb']),
-                label='small_but_not_trivial')
-    p.run()
+    # pylint: disable=line-too-long
+    with TestPipeline() as p:
+      words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
+
+      # [START model_pardo_side_input]
+      # Callable takes additional arguments.
+      def filter_using_length(word, lower_bound, upper_bound=float('inf')):
+        if lower_bound <= len(word) <= upper_bound:
+          yield word
+
+      # Construct a deferred side input.
+      avg_word_len = (words
+                      | beam.Map(len)
+                      | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
+
+      # Call with explicit side inputs.
+      small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
+
+      # A single deferred side input.
+      larger_than_average = (words | 'large' >> beam.FlatMap(
+          filter_using_length,
+          lower_bound=pvalue.AsSingleton(avg_word_len)))
+
+      # Mix and match.
+      small_but_nontrivial = words | beam.FlatMap(
+          filter_using_length,
+          lower_bound=2,
+          upper_bound=pvalue.AsSingleton(avg_word_len))
+      # [END model_pardo_side_input]
+
+      assert_that(small_words, equal_to(['a', 'bb', 'ccc']))
+      assert_that(larger_than_average, equal_to(['ccc', 'dddd']),
+                  label='larger_than_average')
+      assert_that(small_but_nontrivial, equal_to(['bb']),
+                  label='small_but_not_trivial')
 
   def test_pardo_side_input_dofn(self):
     words = ['a', 'bb', 'ccc', 'dddd']
@@ -307,10 +306,9 @@ class TypeHintsTest(unittest.TestCase):
 
   def test_runtime_checks_off(self):
     # pylint: disable=expression-not-assigned
-    p = TestPipeline()
-    # [START type_hints_runtime_off]
-    p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
-    p.run()
+    with TestPipeline() as p:
+      # [START type_hints_runtime_off]
+      p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
     # [END type_hints_runtime_off]
 
   def test_runtime_checks_on(self):
@@ -323,47 +321,45 @@ class TypeHintsTest(unittest.TestCase):
       # [END type_hints_runtime_on]
 
   def test_deterministic_key(self):
-    p = TestPipeline()
-    lines = (p | beam.Create(
-        ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
+    with TestPipeline() as p:
+      lines = (p | beam.Create(
+          ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
 
-    # For pickling
-    global Player  # pylint: disable=global-variable-not-assigned
+      # For pickling
+      global Player  # pylint: disable=global-variable-not-assigned
 
-    # [START type_hints_deterministic_key]
-    class Player(object):
-      def __init__(self, team, name):
-        self.team = team
-        self.name = name
+      # [START type_hints_deterministic_key]
+      class Player(object):
+        def __init__(self, team, name):
+          self.team = team
+          self.name = name
 
-    class PlayerCoder(beam.coders.Coder):
-      def encode(self, player):
-        return '%s:%s' % (player.team, player.name)
+      class PlayerCoder(beam.coders.Coder):
+        def encode(self, player):
+          return '%s:%s' % (player.team, player.name)
 
-      def decode(self, s):
-        return Player(*s.split(':'))
+        def decode(self, s):
+          return Player(*s.split(':'))
 
-      def is_deterministic(self):
-        return True
+        def is_deterministic(self):
+          return True
 
-    beam.coders.registry.register_coder(Player, PlayerCoder)
+      beam.coders.registry.register_coder(Player, PlayerCoder)
 
-    def parse_player_and_score(csv):
-      name, team, score = csv.split(',')
-      return Player(team, name), int(score)
+      def parse_player_and_score(csv):
+        name, team, score = csv.split(',')
+        return Player(team, name), int(score)
 
-    totals = (
-        lines
-        | beam.Map(parse_player_and_score)
-        | beam.CombinePerKey(sum).with_input_types(
-            beam.typehints.Tuple[Player, int]))
-    # [END type_hints_deterministic_key]
+      totals = (
+          lines
+          | beam.Map(parse_player_and_score)
+          | beam.CombinePerKey(sum).with_input_types(
+              beam.typehints.Tuple[Player, int]))
+      # [END type_hints_deterministic_key]
 
-    assert_that(
-        totals | beam.Map(lambda (k, v): (k.name, v)),
-        equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
-
-    p.run()
+      assert_that(
+          totals | beam.Map(lambda (k, v): (k.name, v)),
+          equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
 
 
 class SnippetsTest(unittest.TestCase):
@@ -802,109 +798,104 @@ class CombineTest(unittest.TestCase):
     self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts))
 
   def test_setting_fixed_windows(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120])
-    items = (unkeyed_items
-             | 'key' >> beam.Map(
-                 lambda x: beam.window.TimestampedValue(('k', x), x)))
-    # [START setting_fixed_windows]
-    from apache_beam import window
-    fixed_windowed_items = (
-        items | 'window' >> beam.WindowInto(window.FixedWindows(60)))
-    # [END setting_fixed_windows]
-    summed = (fixed_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed, equal_to([110, 215, 120]))
-    p.run()
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120])
+      items = (unkeyed_items
+               | 'key' >> beam.Map(
+                   lambda x: beam.window.TimestampedValue(('k', x), x)))
+      # [START setting_fixed_windows]
+      from apache_beam import window
+      fixed_windowed_items = (
+          items | 'window' >> beam.WindowInto(window.FixedWindows(60)))
+      # [END setting_fixed_windows]
+      summed = (fixed_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed, equal_to([110, 215, 120]))
 
   def test_setting_sliding_windows(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([2, 16, 23])
-    items = (unkeyed_items
-             | 'key' >> beam.Map(
-                 lambda x: beam.window.TimestampedValue(('k', x), x)))
-    # [START setting_sliding_windows]
-    from apache_beam import window
-    sliding_windowed_items = (
-        items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))
-    # [END setting_sliding_windows]
-    summed = (sliding_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed,
-                equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
-    p.run()
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([2, 16, 23])
+      items = (unkeyed_items
+               | 'key' >> beam.Map(
+                   lambda x: beam.window.TimestampedValue(('k', x), x)))
+      # [START setting_sliding_windows]
+      from apache_beam import window
+      sliding_windowed_items = (
+          items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))
+      # [END setting_sliding_windows]
+      summed = (sliding_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed,
+                  equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
 
   def test_setting_session_windows(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([2, 11, 16, 27])
-    items = (unkeyed_items
-             | 'key' >> beam.Map(
-                 lambda x: beam.window.TimestampedValue(('k', x), x)))
-    # [START setting_session_windows]
-    from apache_beam import window
-    session_windowed_items = (
-        items | 'window' >> beam.WindowInto(window.Sessions(10)))
-    # [END setting_session_windows]
-    summed = (session_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed,
-                equal_to([29, 27]))
-    p.run()
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([2, 11, 16, 27])
+      items = (unkeyed_items
+               | 'key' >> beam.Map(
+                   lambda x: beam.window.TimestampedValue(('k', x), x)))
+      # [START setting_session_windows]
+      from apache_beam import window
+      session_windowed_items = (
+          items | 'window' >> beam.WindowInto(window.Sessions(10)))
+      # [END setting_session_windows]
+      summed = (session_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed,
+                  equal_to([29, 27]))
 
   def test_setting_global_window(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([2, 11, 16, 27])
-    items = (unkeyed_items
-             | 'key' >> beam.Map(
-                 lambda x: beam.window.TimestampedValue(('k', x), x)))
-    # [START setting_global_window]
-    from apache_beam import window
-    session_windowed_items = (
-        items | 'window' >> beam.WindowInto(window.GlobalWindows()))
-    # [END setting_global_window]
-    summed = (session_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed, equal_to([56]))
-    p.run()
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([2, 11, 16, 27])
+      items = (unkeyed_items
+               | 'key' >> beam.Map(
+                   lambda x: beam.window.TimestampedValue(('k', x), x)))
+      # [START setting_global_window]
+      from apache_beam import window
+      session_windowed_items = (
+          items | 'window' >> beam.WindowInto(window.GlobalWindows()))
+      # [END setting_global_window]
+      summed = (session_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed, equal_to([56]))
 
   def test_setting_timestamp(self):
-    p = TestPipeline()
-    unkeyed_items = p | beam.Create([12, 30, 60, 61, 66])
-    items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x)))
+    with TestPipeline() as p:
+      unkeyed_items = p | beam.Create([12, 30, 60, 61, 66])
+      items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x)))
 
-    def extract_timestamp_from_log_entry(entry):
-      return entry[1]
+      def extract_timestamp_from_log_entry(entry):
+        return entry[1]
 
-    # [START setting_timestamp]
-    class AddTimestampDoFn(beam.DoFn):
+      # [START setting_timestamp]
+      class AddTimestampDoFn(beam.DoFn):
 
-      def process(self, element):
-        # Extract the numeric Unix seconds-since-epoch timestamp to be
-        # associated with the current log entry.
-        unix_timestamp = extract_timestamp_from_log_entry(element)
-        # Wrap and emit the current entry and new timestamp in a
-        # TimestampedValue.
-        yield beam.window.TimestampedValue(element, unix_timestamp)
-
-    timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
-    # [END setting_timestamp]
-    fixed_windowed_items = (
-        timestamped_items | 'window' >> beam.WindowInto(
-            beam.window.FixedWindows(60)))
-    summed = (fixed_windowed_items
-              | 'group' >> beam.GroupByKey()
-              | 'combine' >> beam.CombineValues(sum))
-    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
-    assert_that(unkeyed, equal_to([42, 187]))
-    p.run()
+        def process(self, element):
+          # Extract the numeric Unix seconds-since-epoch timestamp to be
+          # associated with the current log entry.
+          unix_timestamp = extract_timestamp_from_log_entry(element)
+          # Wrap and emit the current entry and new timestamp in a
+          # TimestampedValue.
+          yield beam.window.TimestampedValue(element, unix_timestamp)
+
+      timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
+      # [END setting_timestamp]
+      fixed_windowed_items = (
+          timestamped_items | 'window' >> beam.WindowInto(
+              beam.window.FixedWindows(60)))
+      summed = (fixed_windowed_items
+                | 'group' >> beam.GroupByKey()
+                | 'combine' >> beam.CombineValues(sum))
+      unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+      assert_that(unkeyed, equal_to([42, 187]))
 
 
 class PTransformTest(unittest.TestCase):
@@ -919,10 +910,9 @@ class PTransformTest(unittest.TestCase):
         return pcoll | beam.Map(lambda x: len(x))
     # [END model_composite_transform]
 
-    p = TestPipeline()
-    lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
-    assert_that(lengths, equal_to([1, 2, 3]))
-    p.run()
+    with TestPipeline() as p:
+      lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths()
+      assert_that(lengths, equal_to([1, 2, 3]))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index d0cc8a2..ce43e1f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -41,22 +41,20 @@ def run(argv=None):
       help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
-  p = beam.Pipeline(argv=pipeline_args)
+  with beam.Pipeline(argv=pipeline_args) as p:
 
-  # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read(
-      beam.io.PubSubSource(known_args.input_topic))
+    # Read the text file[pattern] into a PCollection.
+    lines = p | beam.io.Read(
+        beam.io.PubSubSource(known_args.input_topic))
 
-  # Capitalize the characters in each line.
-  transformed = (lines
-                 | 'capitalize' >> (beam.Map(lambda x: x.upper())))
+    # Capitalize the characters in each line.
+    transformed = (lines
+                   | 'capitalize' >> (beam.Map(lambda x: x.upper())))
 
-  # Write to PubSub.
-  # pylint: disable=expression-not-assigned
-  transformed | beam.io.Write(
-      beam.io.PubSubSink(known_args.output_topic))
-
-  p.run().wait_until_finish()
+    # Write to PubSub.
+    # pylint: disable=expression-not-assigned
+    transformed | beam.io.Write(
+        beam.io.PubSubSink(known_args.output_topic))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 4b6aecc..e9d5dbe 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -44,29 +44,27 @@ def run(argv=None):
       help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
-  p = beam.Pipeline(argv=pipeline_args)
-
-  # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> beam.io.Read(
-      beam.io.PubSubSource(known_args.input_topic))
-
-  # Capitalize the characters in each line.
-  transformed = (lines
-                 | 'Split' >> (
-                     beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-                     .with_output_types(unicode))
-                 | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
-                 | beam.WindowInto(window.FixedWindows(15, 0))
-                 | 'Group' >> beam.GroupByKey()
-                 | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
-                 | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
-
-  # Write to PubSub.
-  # pylint: disable=expression-not-assigned
-  transformed | 'pubsub_write' >> beam.io.Write(
-      beam.io.PubSubSink(known_args.output_topic))
-
-  p.run().wait_until_finish()
+  with beam.Pipeline(argv=pipeline_args) as p:
+
+    # Read the text file[pattern] into a PCollection.
+    lines = p | 'read' >> beam.io.Read(
+        beam.io.PubSubSource(known_args.input_topic))
+
+    # Capitalize the characters in each line.
+    transformed = (lines
+                   | 'Split' >> (
+                       beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+                       .with_output_types(unicode))
+                   | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
+                   | beam.WindowInto(window.FixedWindows(15, 0))
+                   | 'Group' >> beam.GroupByKey()
+                   | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+                   | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
+
+    # Write to PubSub.
+    # pylint: disable=expression-not-assigned
+    transformed | 'pubsub_write' >> beam.io.Write(
+        beam.io.PubSubSink(known_args.output_topic))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index e7e542a..34dedb2 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -102,7 +102,6 @@ def run(argv=None):
   # pylint: disable=expression-not-assigned
   output | 'write' >> WriteToText(known_args.output)
 
-  # Actually run the pipeline (all operations above are deferred).
   result = p.run()
   result.wait_until_finish()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index ca9f7b6..c0ffd35 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -118,35 +118,32 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  # Read the text file[pattern] into a PCollection, count the occurrences of
-  # each word and filter by a list of words.
-  filtered_words = (
-      p | 'read' >> ReadFromText(known_args.input)
-      | CountWords()
-      | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
-
-  # assert_that is a convenient PTransform that checks a PCollection has an
-  # expected value. Asserts are best used in unit tests with small data sets but
-  # is demonstrated here as a teaching tool.
-  #
-  # Note assert_that does not provide any output and that successful completion
-  # of the Pipeline implies that the expectations were  met. Learn more at
-  # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
-  # test your pipeline.
-  assert_that(
-      filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
-
-  # Format the counts into a PCollection of strings and write the output using a
-  # "Write" transform that has side effects.
-  # pylint: disable=unused-variable
-  output = (filtered_words
-            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
-            | 'write' >> WriteToText(known_args.output))
-
-  # Actually run the pipeline (all operations above are deferred).
-  p.run().wait_until_finish()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    # Read the text file[pattern] into a PCollection, count the occurrences of
+    # each word and filter by a list of words.
+    filtered_words = (
+        p | 'read' >> ReadFromText(known_args.input)
+        | CountWords()
+        | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
+
+    # assert_that is a convenient PTransform that checks a PCollection has an
+    # expected value. Asserts are best used in unit tests with small data sets
+    # but is demonstrated here as a teaching tool.
+    #
+    # Note assert_that does not provide any output and that successful
+    # completion of the Pipeline implies that the expectations were  met. Learn
+    # more at https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
+    # on how to best test your pipeline.
+    assert_that(
+        filtered_words, equal_to([('Flourish', 3), ('stomach', 1)]))
+
+    # Format the counts into a PCollection of strings and write the output using
+    # a "Write" transform that has side effects.
+    # pylint: disable=unused-variable
+    output = (filtered_words
+              | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+              | 'write' >> WriteToText(known_args.output))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 5109c08..76b0a22 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -92,28 +92,25 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
 
-  # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> ReadFromText(known_args.input)
+    # Read the text file[pattern] into a PCollection.
+    lines = p | ReadFromText(known_args.input)
 
-  # Count the occurrences of each word.
-  counts = (lines
-            | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-                          .with_output_types(unicode))
-            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
-            | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+    # Count the occurrences of each word.
+    counts = (
+        lines
+        | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+                      .with_output_types(unicode))
+        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
+        | 'GroupAndSum' >> beam.CombinePerKey(sum))
 
-  # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+    # Format the counts into a PCollection of strings.
+    output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
 
-  # Write the output using a "Write" transform that has side effects.
-  # pylint: disable=expression-not-assigned
-  output | 'write' >> WriteToText(known_args.output)
-
-  # Actually run the pipeline (all operations above are deferred).
-  p.run().wait_until_finish()
+    # Write the output using a "Write" transform that has side effects.
+    # pylint: disable=expression-not-assigned
+    output | WriteToText(known_args.output)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/io/filebasedsink_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py
index 1f6aeee..7c8ddb4 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -146,9 +146,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
     sink = MyFileBasedSink(
         temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()
     )
-    p = TestPipeline()
-    p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
-    p.run()
+    with TestPipeline() as p:
+      p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
     self.assertEqual(
         open(temp_path + '-00000-of-00001.output').read(), '[start][end]')
 
@@ -160,9 +159,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
         file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
         coder=coders.ToStringCoder()
     )
-    p = TestPipeline()
-    p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
-    p.run()
+    with TestPipeline() as p:
+      p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
     self.assertEqual(
         open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]')
 
@@ -174,10 +172,8 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
         num_shards=3,
         shard_name_template='_NN_SSS_',
         coder=coders.ToStringCoder())
-    p = TestPipeline()
-    p | beam.Create(['a', 'b']) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
-
-    p.run()
+    with TestPipeline() as p:
+      p | beam.Create(['a', 'b']) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
 
     concat = ''.join(
         open(temp_path + '_03_%03d_.output' % shard_num).read()

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 5048534..9093abf 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -28,19 +28,18 @@ to be executed for each node visited is specified through a runner object.
 Typical usage:
 
   # Create a pipeline object using a local runner for execution.
-  p = beam.Pipeline('DirectRunner')
+  with beam.Pipeline('DirectRunner') as p:
 
-  # Add to the pipeline a "Create" transform. When executed this
-  # transform will produce a PCollection object with the specified values.
-  pcoll = p | 'Create' >> beam.Create([1, 2, 3])
+    # Add to the pipeline a "Create" transform. When executed this
+    # transform will produce a PCollection object with the specified values.
+    pcoll = p | 'Create' >> beam.Create([1, 2, 3])
 
-  # Another transform could be applied to pcoll, e.g., writing to a text file.
-  # For other transforms, refer to transforms/ directory.
-  pcoll | 'Write' >> beam.io.WriteToText('./output')
+    # Another transform could be applied to pcoll, e.g., writing to a text file.
+    # For other transforms, refer to transforms/ directory.
+    pcoll | 'Write' >> beam.io.WriteToText('./output')
 
-  # run() will execute the DAG stored in the pipeline.  The execution of the
-  # nodes visited is done using the specified local runner.
-  p.run()
+    # run() will execute the DAG stored in the pipeline.  The execution of the
+    # nodes visited is done using the specified local runner.
 
 """
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 946a60a..c79fec8 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -247,26 +247,23 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_tuple_combine_fn(self):
-    p = TestPipeline()
-    result = (
-        p
-        | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
-        | beam.CombineGlobally(combine.TupleCombineFn(max,
-                                                      combine.MeanCombineFn(),
-                                                      sum)).without_defaults())
-    assert_that(result, equal_to([('c', 111.0 / 3, 99.0)]))
-    p.run()
+    with TestPipeline() as p:
+      result = (
+          p
+          | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+          | beam.CombineGlobally(combine.TupleCombineFn(
+              max, combine.MeanCombineFn(), sum)).without_defaults())
+      assert_that(result, equal_to([('c', 111.0 / 3, 99.0)]))
 
   def test_tuple_combine_fn_without_defaults(self):
-    p = TestPipeline()
-    result = (
-        p
-        | Create([1, 1, 2, 3])
-        | beam.CombineGlobally(
-            combine.TupleCombineFn(min, combine.MeanCombineFn(), max)
-            .with_common_input()).without_defaults())
-    assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
-    p.run()
+    with TestPipeline() as p:
+      result = (
+          p
+          | Create([1, 1, 2, 3])
+          | beam.CombineGlobally(
+              combine.TupleCombineFn(min, combine.MeanCombineFn(), max)
+              .with_common_input()).without_defaults())
+      assert_that(result, equal_to([(1, 7.0 / 4, 3)]))
 
   def test_to_list_and_to_dict(self):
     pipeline = TestPipeline()
@@ -295,29 +292,26 @@ class CombineTest(unittest.TestCase):
     pipeline.run()
 
   def test_combine_globally_with_default(self):
-    p = TestPipeline()
-    assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
-    p.run()
+    with TestPipeline() as p:
+      assert_that(p | Create([]) | CombineGlobally(sum), equal_to([0]))
 
   def test_combine_globally_without_default(self):
-    p = TestPipeline()
-    result = p | Create([]) | CombineGlobally(sum).without_defaults()
-    assert_that(result, equal_to([]))
-    p.run()
+    with TestPipeline() as p:
+      result = p | Create([]) | CombineGlobally(sum).without_defaults()
+      assert_that(result, equal_to([]))
 
   def test_combine_globally_with_default_side_input(self):
-    class CombineWithSideInput(PTransform):
+    class SideInputCombine(PTransform):
       def expand(self, pcoll):
         side = pcoll | CombineGlobally(sum).as_singleton_view()
         main = pcoll.pipeline | Create([None])
         return main | Map(lambda _, s: s, side)
 
-    p = TestPipeline()
-    result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
-    result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
-    assert_that(result1, equal_to([0]), label='r1')
-    assert_that(result2, equal_to([10]), label='r2')
-    p.run()
+    with TestPipeline() as p:
+      result1 = p | 'i1' >> Create([]) | 'c1' >> SideInputCombine()
+      result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> SideInputCombine()
+      assert_that(result1, equal_to([0]), label='r1')
+      assert_that(result2, equal_to([10]), label='r2')
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index fd1bb9d..977a364 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -167,90 +167,85 @@ class WindowTest(unittest.TestCase):
             | Map(lambda x: WindowedValue((key, x), x, [GlobalWindow()])))
 
   def test_sliding_windows(self):
-    p = TestPipeline()
-    pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
-    result = (pcoll
-              | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
-              | GroupByKey()
-              | reify_windows)
-    expected = [('key @ [-2.0, 2.0)', [1]),
-                ('key @ [0.0, 4.0)', [1, 2, 3]),
-                ('key @ [2.0, 6.0)', [2, 3])]
-    assert_that(result, equal_to(expected))
-    p.run()
+    with TestPipeline() as p:
+      pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
+      result = (pcoll
+                | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
+                | GroupByKey()
+                | reify_windows)
+      expected = [('key @ [-2.0, 2.0)', [1]),
+                  ('key @ [0.0, 4.0)', [1, 2, 3]),
+                  ('key @ [2.0, 6.0)', [2, 3])]
+      assert_that(result, equal_to(expected))
 
   def test_sessions(self):
-    p = TestPipeline()
-    pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
-    result = (pcoll
-              | 'w' >> WindowInto(Sessions(10))
-              | GroupByKey()
-              | sort_values
-              | reify_windows)
-    expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
-                ('key @ [20.0, 45.0)', [20, 27, 35])]
-    assert_that(result, equal_to(expected))
-    p.run()
+    with TestPipeline() as p:
+      pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
+      result = (pcoll
+                | 'w' >> WindowInto(Sessions(10))
+                | GroupByKey()
+                | sort_values
+                | reify_windows)
+      expected = [('key @ [1.0, 13.0)', [1, 2, 3]),
+                  ('key @ [20.0, 45.0)', [20, 27, 35])]
+      assert_that(result, equal_to(expected))
 
   def test_timestamped_value(self):
-    p = TestPipeline()
-    result = (p
-              | 'start' >> Create([(k, k) for k in range(10)])
-              | Map(lambda (x, t): TimestampedValue(x, t))
-              | 'w' >> WindowInto(FixedWindows(5))
-              | Map(lambda v: ('key', v))
-              | GroupByKey())
-    assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
-                                  ('key', [5, 6, 7, 8, 9])]))
-    p.run()
+    with TestPipeline() as p:
+      result = (p
+                | 'start' >> Create([(k, k) for k in range(10)])
+                | Map(lambda (x, t): TimestampedValue(x, t))
+                | 'w' >> WindowInto(FixedWindows(5))
+                | Map(lambda v: ('key', v))
+                | GroupByKey())
+      assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
+                                    ('key', [5, 6, 7, 8, 9])]))
 
   def test_rewindow(self):
-    p = TestPipeline()
-    result = (p
-              | Create([(k, k) for k in range(10)])
-              | Map(lambda (x, t): TimestampedValue(x, t))
-              | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
-              # Per the model, each element is now duplicated across
-              # three windows. Rewindowing must preserve this duplication.
-              | 'rewindow' >> WindowInto(FixedWindows(5))
-              | 'rewindow2' >> WindowInto(FixedWindows(5))
-              | Map(lambda v: ('key', v))
-              | GroupByKey())
-    assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
-                                  ('key', sorted([5, 6, 7, 8, 9] * 3))]))
-    p.run()
+    with TestPipeline() as p:
+      result = (p
+                | Create([(k, k) for k in range(10)])
+                | Map(lambda (x, t): TimestampedValue(x, t))
+                | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
+                # Per the model, each element is now duplicated across
+                # three windows. Rewindowing must preserve this duplication.
+                | 'rewindow' >> WindowInto(FixedWindows(5))
+                | 'rewindow2' >> WindowInto(FixedWindows(5))
+                | Map(lambda v: ('key', v))
+                | GroupByKey())
+      assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
+                                    ('key', sorted([5, 6, 7, 8, 9] * 3))]))
 
   def test_timestamped_with_combiners(self):
-    p = TestPipeline()
-    result = (p
-              # Create some initial test values.
-              | 'start' >> Create([(k, k) for k in range(10)])
-              # The purpose of the WindowInto transform is to establish a
-              # FixedWindows windowing function for the PCollection.
-              # It does not bucket elements into windows since the timestamps
-              # from Create are not spaced 5 ms apart and very likely they all
-              # fall into the same window.
-              | 'w' >> WindowInto(FixedWindows(5))
-              # Generate timestamped values using the values as timestamps.
-              # Now there are values 5 ms apart and since Map propagates the
-              # windowing function from input to output the output PCollection
-              # will have elements falling into different 5ms windows.
-              | Map(lambda (x, t): TimestampedValue(x, t))
-              # We add a 'key' to each value representing the index of the
-              # window. This is important since there is no guarantee of
-              # order for the elements of a PCollection.
-              | Map(lambda v: (v / 5, v)))
-    # Sum all elements associated with a key and window. Although it
-    # is called CombinePerKey it is really CombinePerKeyAndWindow the
-    # same way GroupByKey is really GroupByKeyAndWindow.
-    sum_per_window = result | CombinePerKey(sum)
-    # Compute mean per key and window.
-    mean_per_window = result | combiners.Mean.PerKey()
-    assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
-                label='assert:sum')
-    assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
-                label='assert:mean')
-    p.run()
+    with TestPipeline() as p:
+      result = (p
+                # Create some initial test values.
+                | 'start' >> Create([(k, k) for k in range(10)])
+                # The purpose of the WindowInto transform is to establish a
+                # FixedWindows windowing function for the PCollection.
+                # It does not bucket elements into windows since the timestamps
+                # from Create are not spaced 5 ms apart and very likely they all
+                # fall into the same window.
+                | 'w' >> WindowInto(FixedWindows(5))
+                # Generate timestamped values using the values as timestamps.
+                # Now there are values 5 ms apart and since Map propagates the
+                # windowing function from input to output the output PCollection
+                # will have elements falling into different 5ms windows.
+                | Map(lambda (x, t): TimestampedValue(x, t))
+                # We add a 'key' to each value representing the index of the
+                # window. This is important since there is no guarantee of
+                # order for the elements of a PCollection.
+                | Map(lambda v: (v / 5, v)))
+      # Sum all elements associated with a key and window. Although it
+      # is called CombinePerKey it is really CombinePerKeyAndWindow the
+      # same way GroupByKey is really GroupByKeyAndWindow.
+      sum_per_window = result | CombinePerKey(sum)
+      # Compute mean per key and window.
+      mean_per_window = result | combiners.Mean.PerKey()
+      assert_that(sum_per_window, equal_to([(0, 10), (1, 35)]),
+                  label='assert:sum')
+      assert_that(mean_per_window, equal_to([(0, 2.0), (1, 7.0)]),
+                  label='assert:mean')
 
 
 class RunnerApiTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index e31b9cc..50f0deb 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -98,11 +98,10 @@ class WriteTest(unittest.TestCase):
                       return_write_results=True):
     write_to_test_sink = WriteToTestSink(return_init_result,
                                          return_write_results)
-    p = TestPipeline()
-    result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
+    with TestPipeline() as p:
+      result = p | beam.Create(data) | write_to_test_sink | beam.Map(list)
 
-    assert_that(result, is_empty())
-    p.run()
+      assert_that(result, is_empty())
 
     sink = write_to_test_sink.last_sink
     self.assertIsNotNone(sink)

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 589dc0e..c81ef32 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -168,12 +168,11 @@ class SideInputTest(unittest.TestCase):
     @typehints.with_input_types(str, int)
     def repeat(s, times):
       return s * times
-    p = TestPipeline()
-    main_input = p | beam.Create(['a', 'bb', 'c'])
-    side_input = p | 'side' >> beam.Create([3])
-    result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
-    assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
-    p.run()
+    with TestPipeline() as p:
+      main_input = p | beam.Create(['a', 'bb', 'c'])
+      side_input = p | 'side' >> beam.Create([3])
+      result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
+      assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
 
     bad_side_input = p | 'bad_side' >> beam.Create(['z'])
     with self.assertRaises(typehints.TypeCheckError):
@@ -183,12 +182,11 @@ class SideInputTest(unittest.TestCase):
     @typehints.with_input_types(str, typehints.Iterable[str])
     def concat(glue, items):
       return glue.join(sorted(items))
-    p = TestPipeline()
-    main_input = p | beam.Create(['a', 'bb', 'c'])
-    side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
-    result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
-    assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
-    p.run()
+    with TestPipeline() as p:
+      main_input = p | beam.Create(['a', 'bb', 'c'])
+      side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
+      result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
+      assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
 
     bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3])
     with self.assertRaises(typehints.TypeCheckError):


[2/3] beam git commit: Automatically convert examples to use with syntax.

Posted by ro...@apache.org.
Automatically convert examples to use with syntax.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3a62b4f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3a62b4f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3a62b4f7

Branch: refs/heads/master
Commit: 3a62b4f7b20bda2b3c4ca648f90988d387cfe20d
Parents: d0601b3
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 18 17:22:25 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 23 10:19:04 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete.py           |  19 +-
 .../examples/complete/autocomplete_test.py      |  31 +-
 .../examples/complete/estimate_pi.py            |  11 +-
 .../examples/complete/estimate_pi_test.py       |  12 +-
 .../examples/complete/game/hourly_team_score.py |  19 +-
 .../examples/complete/game/user_score.py        |  15 +-
 .../complete/juliaset/juliaset/juliaset.py      |  44 +-
 .../apache_beam/examples/complete/tfidf.py      |  21 +-
 .../apache_beam/examples/complete/tfidf_test.py |  28 +-
 .../examples/complete/top_wikipedia_sessions.py |  12 +-
 .../complete/top_wikipedia_sessions_test.py     |   9 +-
 .../examples/cookbook/bigquery_schema.py        | 159 +++---
 .../examples/cookbook/bigquery_side_input.py    |  51 +-
 .../cookbook/bigquery_side_input_test.py        |  39 +-
 .../examples/cookbook/bigquery_tornadoes.py     |  33 +-
 .../cookbook/bigquery_tornadoes_test.py         |  19 +-
 .../apache_beam/examples/cookbook/coders.py     |  16 +-
 .../examples/cookbook/coders_test.py            |  14 +-
 .../examples/cookbook/custom_ptransform.py      |  27 +-
 .../examples/cookbook/custom_ptransform_test.py |  11 +-
 .../examples/cookbook/datastore_wordcount.py    |  20 +-
 .../apache_beam/examples/cookbook/filters.py    |  21 +-
 .../examples/cookbook/group_with_coder.py       |  43 +-
 .../examples/cookbook/group_with_coder_test.py  |   4 +-
 .../examples/cookbook/mergecontacts.py          | 115 +++--
 .../examples/cookbook/mergecontacts_test.py     |   3 +-
 .../examples/cookbook/multiple_output_pardo.py  |  72 ++-
 .../cookbook/multiple_output_pardo_test.py      |   2 +-
 .../apache_beam/examples/snippets/snippets.py   | 494 +++++++++----------
 .../examples/snippets/snippets_test.py          | 326 ++++++------
 .../apache_beam/examples/streaming_wordcap.py   |  24 +-
 .../apache_beam/examples/streaming_wordcount.py |  44 +-
 sdks/python/apache_beam/examples/wordcount.py   |   1 -
 .../apache_beam/examples/wordcount_debugging.py |  55 +--
 .../apache_beam/examples/wordcount_minimal.py   |  33 +-
 .../python/apache_beam/io/filebasedsink_test.py |  16 +-
 sdks/python/apache_beam/pipeline.py             |  19 +-
 .../apache_beam/transforms/combiners_test.py    |  58 +--
 .../apache_beam/transforms/window_test.py       | 147 +++---
 .../transforms/write_ptransform_test.py         |   7 +-
 .../typehints/typed_pipeline_test.py            |  22 +-
 41 files changed, 1005 insertions(+), 1111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index f0acc3f..ab3397c 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -44,16 +44,15 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  (p  # pylint: disable=expression-not-assigned
-   | 'read' >> ReadFromText(known_args.input)
-   | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-   | 'TopPerPrefix' >> TopPerPrefix(5)
-   | 'format' >> beam.Map(
-       lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
-   | 'write' >> WriteToText(known_args.output))
-  p.run()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    (p  # pylint: disable=expression-not-assigned
+     | 'read' >> ReadFromText(known_args.input)
+     | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+     | 'TopPerPrefix' >> TopPerPrefix(5)
+     | 'format' >> beam.Map(
+         lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+     | 'write' >> WriteToText(known_args.output))
 
 
 class TopPerPrefix(beam.PTransform):

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 378d222..e2c84d6 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,22 +31,21 @@ class AutocompleteTest(unittest.TestCase):
   WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
 
   def test_top_prefixes(self):
-    p = TestPipeline()
-    words = p | beam.Create(self.WORDS)
-    result = words | autocomplete.TopPerPrefix(5)
-    # values must be hashable for now
-    result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
-    assert_that(result, equal_to(
-        [
-            ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
-            ('to', ((3, 'to'), )),
-            ('th', ((2, 'this'), (1, 'that'))),
-            ('thi', ((2, 'this'), )),
-            ('this', ((2, 'this'), )),
-            ('tha', ((1, 'that'), )),
-            ('that', ((1, 'that'), )),
-        ]))
-    p.run()
+    with TestPipeline() as p:
+      words = p | beam.Create(self.WORDS)
+      result = words | autocomplete.TopPerPrefix(5)
+      # values must be hashable for now
+      result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
+      assert_that(result, equal_to(
+          [
+              ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
+              ('to', ((3, 'to'), )),
+              ('th', ((2, 'this'), (1, 'that'))),
+              ('thi', ((2, 'this'), )),
+              ('this', ((2, 'this'), )),
+              ('tha', ((1, 'that'), )),
+              ('that', ((1, 'that'), )),
+          ]))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index c709713..7e3c4cd 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -113,14 +113,11 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
 
-  (p  # pylint: disable=expression-not-assigned
-   | EstimatePiTransform()
-   | WriteToText(known_args.output, coder=JsonCoder()))
-
-  # Actually run the pipeline (all operations above are deferred).
-  p.run()
+    (p  # pylint: disable=expression-not-assigned
+     | EstimatePiTransform()
+     | WriteToText(known_args.output, coder=JsonCoder()))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index fd51309..f1cbb0a 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -38,13 +38,13 @@ def in_between(lower, upper):
 class EstimatePiTest(unittest.TestCase):
 
   def test_basics(self):
-    p = TestPipeline()
-    result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000)
+    with TestPipeline() as p:
+      result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000)
 
-    # Note: Probabilistically speaking this test can fail with a probability
-    # that is very small (VERY) given that we run at least 500 thousand trials.
-    assert_that(result, in_between(3.125, 3.155))
-    p.run()
+      # Note: Probabilistically speaking this test can fail with a probability
+      # that is very small (VERY) given that we run at least 500 thousand
+      # trials.
+      assert_that(result, in_between(3.125, 3.155))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
index e9d7188..9f398d9 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -276,18 +276,15 @@ def run(argv=None):
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   pipeline_options = PipelineOptions(pipeline_args)
-  p = beam.Pipeline(options=pipeline_options)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-
-  (p  # pylint: disable=expression-not-assigned
-   | ReadFromText(known_args.input)
-   | HourlyTeamScore(
-       known_args.start_min, known_args.stop_min, known_args.window_duration)
-   | WriteWindowedToBigQuery(
-       known_args.table_name, known_args.dataset, configure_bigquery_write()))
-
-  result = p.run()
-  result.wait_until_finish()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    (p  # pylint: disable=expression-not-assigned
+     | ReadFromText(known_args.input)
+     | HourlyTeamScore(
+         known_args.start_min, known_args.stop_min, known_args.window_duration)
+     | WriteWindowedToBigQuery(
+         known_args.table_name, known_args.dataset, configure_bigquery_write()))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 389d2c6..c9f2738 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -201,16 +201,13 @@ def run(argv=None):
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   pipeline_options = PipelineOptions(pipeline_args)
-  p = beam.Pipeline(options=pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
 
-  (p  # pylint: disable=expression-not-assigned
-   | ReadFromText(known_args.input) # Read events from a file and parse them.
-   | UserScore()
-   | WriteToBigQuery(
-       known_args.table_name, known_args.dataset, configure_bigquery_write()))
-
-  result = p.run()
-  result.wait_until_finish()
+    (p  # pylint: disable=expression-not-assigned
+     | ReadFromText(known_args.input) # Read events from a file and parse them.
+     | UserScore()
+     | WriteToBigQuery(
+         known_args.table_name, known_args.dataset, configure_bigquery_write()))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 5ff2b78..61e3fd1 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -99,26 +99,24 @@ def run(argv=None):  # pylint: disable=missing-docstring
                       help='Output file to write the resulting image to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
-  p = beam.Pipeline(argv=pipeline_args)
-  n = int(known_args.grid_size)
-
-  coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
-
-  # Group each coordinate triplet by its x value, then write the coordinates to
-  # the output file with an x-coordinate grouping per line.
-  # pylint: disable=expression-not-assigned
-  (coordinates
-   | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
-   | 'x coord' >> beam.GroupByKey()
-   | 'format' >> beam.Map(
-       lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
-   | WriteToText(known_args.coordinate_output))
-  # pylint: enable=expression-not-assigned
-  return p.run().wait_until_finish()
-
-  # Optionally render the image and save it to a file.
-  # TODO(silviuc): Add this functionality.
-  # if p.options.image_output is not None:
-  #  julia_set_image = generate_julia_set_visualization(
-  #      file_with_coordinates, n, 100)
-  #  save_julia_set_visualization(p.options.image_output, julia_set_image)
+  with beam.Pipeline(argv=pipeline_args) as p:
+    n = int(known_args.grid_size)
+
+    coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
+
+    # Group each coordinate triplet by its x value, then write the coordinates
+    # to the output file with an x-coordinate grouping per line.
+    # pylint: disable=expression-not-assigned
+    (coordinates
+     | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+     | 'x coord' >> beam.GroupByKey()
+     | 'format' >> beam.Map(
+         lambda (k, coords): ' '.join('(%s, %s, %s)' % c for c in coords))
+     | WriteToText(known_args.coordinate_output))
+
+    # Optionally render the image and save it to a file.
+    # TODO(silviuc): Add this functionality.
+    # if p.options.image_output is not None:
+    #  julia_set_image = generate_julia_set_visualization(
+    #      file_with_coordinates, n, 100)
+    #  save_julia_set_visualization(p.options.image_output, julia_set_image)

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index a98d906..a88ff82 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -191,17 +191,16 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  # Read documents specified by the uris command line option.
-  pcoll = read_documents(p, glob.glob(known_args.uris))
-  # Compute TF-IDF information for each word.
-  output = pcoll | TfIdf()
-  # Write the output using a "Write" transform that has side effects.
-  # pylint: disable=expression-not-assigned
-  output | 'write' >> WriteToText(known_args.output)
-  # Execute the pipeline and wait until it is completed.
-  p.run().wait_until_finish()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    # Read documents specified by the uris command line option.
+    pcoll = read_documents(p, glob.glob(known_args.uris))
+    # Compute TF-IDF information for each word.
+    output = pcoll | TfIdf()
+    # Write the output using a "Write" transform that has side effects.
+    # pylint: disable=expression-not-assigned
+    output | 'write' >> WriteToText(known_args.output)
+    # Execute the pipeline and wait until it is completed.
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index f177dfc..322426f 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -50,20 +50,20 @@ class TfIdfTest(unittest.TestCase):
       f.write(contents)
 
   def test_tfidf_transform(self):
-    p = TestPipeline()
-    uri_to_line = p | 'create sample' >> beam.Create(
-        [('1.txt', 'abc def ghi'),
-         ('2.txt', 'abc def'),
-         ('3.txt', 'abc')])
-    result = (
-        uri_to_line
-        | tfidf.TfIdf()
-        | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
-    assert_that(result, equal_to(EXPECTED_RESULTS))
-    # Run the pipeline. Note that the assert_that above adds to the pipeline
-    # a check that the result PCollection contains expected values. To actually
-    # trigger the check the pipeline must be run.
-    p.run()
+    with TestPipeline() as p:
+      uri_to_line = p | 'create sample' >> beam.Create(
+          [('1.txt', 'abc def ghi'),
+           ('2.txt', 'abc def'),
+           ('3.txt', 'abc')])
+      result = (
+          uri_to_line
+          | tfidf.TfIdf()
+          | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+      assert_that(result, equal_to(EXPECTED_RESULTS))
+      # Run the pipeline. Note that the assert_that above adds to the pipeline
+      # a check that the result PCollection contains expected values.
+      # To actually trigger the check the pipeline must be run (e.g. by
+      # exiting the with context).
 
   def test_basics(self):
     # Setup the files with expected content.

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index aa48e4e..9a9ad78 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -159,14 +159,12 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
 
-  (p  # pylint: disable=expression-not-assigned
-   | ReadFromText(known_args.input)
-   | ComputeTopSessions(known_args.sampling_threshold)
-   | WriteToText(known_args.output))
-
-  p.run()
+    (p  # pylint: disable=expression-not-assigned
+     | ReadFromText(known_args.input)
+     | ComputeTopSessions(known_args.sampling_threshold)
+     | WriteToText(known_args.output))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 5fb6276..ced8a44 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -52,12 +52,11 @@ class ComputeTopSessionsTest(unittest.TestCase):
   ]
 
   def test_compute_top_sessions(self):
-    p = TestPipeline()
-    edits = p | beam.Create(self.EDITS)
-    result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
+    with TestPipeline() as p:
+      edits = p | beam.Create(self.EDITS)
+      result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 
-    assert_that(result, equal_to(self.EXPECTED))
-    p.run()
+      assert_that(result, equal_to(self.EXPECTED))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index 400189e..3a8af67 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -42,86 +42,85 @@ def run(argv=None):
        'or DATASET.TABLE.'))
   known_args, pipeline_args = parser.parse_known_args(argv)
 
-  p = beam.Pipeline(argv=pipeline_args)
-
-  from apache_beam.io.gcp.internal.clients import bigquery  # pylint: disable=wrong-import-order, wrong-import-position
-
-  table_schema = bigquery.TableSchema()
-
-  # Fields that use standard types.
-  kind_schema = bigquery.TableFieldSchema()
-  kind_schema.name = 'kind'
-  kind_schema.type = 'string'
-  kind_schema.mode = 'nullable'
-  table_schema.fields.append(kind_schema)
-
-  full_name_schema = bigquery.TableFieldSchema()
-  full_name_schema.name = 'fullName'
-  full_name_schema.type = 'string'
-  full_name_schema.mode = 'required'
-  table_schema.fields.append(full_name_schema)
-
-  age_schema = bigquery.TableFieldSchema()
-  age_schema.name = 'age'
-  age_schema.type = 'integer'
-  age_schema.mode = 'nullable'
-  table_schema.fields.append(age_schema)
-
-  gender_schema = bigquery.TableFieldSchema()
-  gender_schema.name = 'gender'
-  gender_schema.type = 'string'
-  gender_schema.mode = 'nullable'
-  table_schema.fields.append(gender_schema)
-
-  # A nested field
-  phone_number_schema = bigquery.TableFieldSchema()
-  phone_number_schema.name = 'phoneNumber'
-  phone_number_schema.type = 'record'
-  phone_number_schema.mode = 'nullable'
-
-  area_code = bigquery.TableFieldSchema()
-  area_code.name = 'areaCode'
-  area_code.type = 'integer'
-  area_code.mode = 'nullable'
-  phone_number_schema.fields.append(area_code)
-
-  number = bigquery.TableFieldSchema()
-  number.name = 'number'
-  number.type = 'integer'
-  number.mode = 'nullable'
-  phone_number_schema.fields.append(number)
-  table_schema.fields.append(phone_number_schema)
-
-  # A repeated field.
-  children_schema = bigquery.TableFieldSchema()
-  children_schema.name = 'children'
-  children_schema.type = 'string'
-  children_schema.mode = 'repeated'
-  table_schema.fields.append(children_schema)
-
-  def create_random_record(record_id):
-    return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id,
-            'age': int(record_id) * 10, 'gender': 'male',
-            'phoneNumber': {
-                'areaCode': int(record_id) * 100,
-                'number': int(record_id) * 100000},
-            'children': ['child' + record_id + '1',
-                         'child' + record_id + '2',
-                         'child' + record_id + '3']
-           }
-
-  # pylint: disable=expression-not-assigned
-  record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
-  records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
-  records | 'write' >> beam.io.Write(
-      beam.io.BigQuerySink(
-          known_args.output,
-          schema=table_schema,
-          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-          write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
-
-  # Run the pipeline (all operations are deferred until run() is called).
-  p.run()
+  with beam.Pipeline(argv=pipeline_args) as p:
+
+    from apache_beam.io.gcp.internal.clients import bigquery  # pylint: disable=wrong-import-order, wrong-import-position
+
+    table_schema = bigquery.TableSchema()
+
+    # Fields that use standard types.
+    kind_schema = bigquery.TableFieldSchema()
+    kind_schema.name = 'kind'
+    kind_schema.type = 'string'
+    kind_schema.mode = 'nullable'
+    table_schema.fields.append(kind_schema)
+
+    full_name_schema = bigquery.TableFieldSchema()
+    full_name_schema.name = 'fullName'
+    full_name_schema.type = 'string'
+    full_name_schema.mode = 'required'
+    table_schema.fields.append(full_name_schema)
+
+    age_schema = bigquery.TableFieldSchema()
+    age_schema.name = 'age'
+    age_schema.type = 'integer'
+    age_schema.mode = 'nullable'
+    table_schema.fields.append(age_schema)
+
+    gender_schema = bigquery.TableFieldSchema()
+    gender_schema.name = 'gender'
+    gender_schema.type = 'string'
+    gender_schema.mode = 'nullable'
+    table_schema.fields.append(gender_schema)
+
+    # A nested field
+    phone_number_schema = bigquery.TableFieldSchema()
+    phone_number_schema.name = 'phoneNumber'
+    phone_number_schema.type = 'record'
+    phone_number_schema.mode = 'nullable'
+
+    area_code = bigquery.TableFieldSchema()
+    area_code.name = 'areaCode'
+    area_code.type = 'integer'
+    area_code.mode = 'nullable'
+    phone_number_schema.fields.append(area_code)
+
+    number = bigquery.TableFieldSchema()
+    number.name = 'number'
+    number.type = 'integer'
+    number.mode = 'nullable'
+    phone_number_schema.fields.append(number)
+    table_schema.fields.append(phone_number_schema)
+
+    # A repeated field.
+    children_schema = bigquery.TableFieldSchema()
+    children_schema.name = 'children'
+    children_schema.type = 'string'
+    children_schema.mode = 'repeated'
+    table_schema.fields.append(children_schema)
+
+    def create_random_record(record_id):
+      return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id,
+              'age': int(record_id) * 10, 'gender': 'male',
+              'phoneNumber': {
+                  'areaCode': int(record_id) * 100,
+                  'number': int(record_id) * 100000},
+              'children': ['child' + record_id + '1',
+                           'child' + record_id + '2',
+                           'child' + record_id + '3']
+             }
+
+    # pylint: disable=expression-not-assigned
+    record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
+    records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
+    records | 'write' >> beam.io.Write(
+        beam.io.BigQuerySink(
+            known_args.output,
+            schema=table_schema,
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
+
+    # Run the pipeline (all operations are deferred until run() is called).
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 6b28818..9911a67 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -88,32 +88,31 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  group_ids = []
-  for i in xrange(0, int(known_args.num_groups)):
-    group_ids.append('id' + str(i))
-
-  query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare'
-  query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare'
-  ignore_corpus = known_args.ignore_corpus
-  ignore_word = known_args.ignore_word
-
-  pcoll_corpus = p | 'read corpus' >> beam.io.Read(
-      beam.io.BigQuerySource(query=query_corpus))
-  pcoll_word = p | 'read_words' >> beam.io.Read(
-      beam.io.BigQuerySource(query=query_word))
-  pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
-      [ignore_corpus])
-  pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
-  pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
-
-  pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
-                               pcoll_ignore_corpus, pcoll_ignore_word)
-
-  # pylint:disable=expression-not-assigned
-  pcoll_groups | WriteToText(known_args.output)
-  p.run()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    group_ids = []
+    for i in xrange(0, int(known_args.num_groups)):
+      group_ids.append('id' + str(i))
+
+    query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare'
+    query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare'
+    ignore_corpus = known_args.ignore_corpus
+    ignore_word = known_args.ignore_word
+
+    pcoll_corpus = p | 'read corpus' >> beam.io.Read(
+        beam.io.BigQuerySource(query=query_corpus))
+    pcoll_word = p | 'read_words' >> beam.io.Read(
+        beam.io.BigQuerySource(query=query_word))
+    pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
+        [ignore_corpus])
+    pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
+    pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
+
+    pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
+                                 pcoll_ignore_corpus, pcoll_ignore_word)
+
+    # pylint:disable=expression-not-assigned
+    pcoll_groups | WriteToText(known_args.output)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index b11dc47..964b35b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -30,25 +30,26 @@ from apache_beam.testing.util import equal_to
 class BigQuerySideInputTest(unittest.TestCase):
 
   def test_create_groups(self):
-    p = TestPipeline()
-
-    group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C'])
-    corpus_pcoll = p | 'CreateCorpus' >> beam.Create(
-        [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
-    words_pcoll = p | 'CreateWords' >> beam.Create(
-        [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}])
-    ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1'])
-    ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1'])
-
-    groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
-                                               words_pcoll, ignore_corpus_pcoll,
-                                               ignore_word_pcoll)
-
-    assert_that(groups, equal_to(
-        [('A', 'corpus2', 'word2'),
-         ('B', 'corpus2', 'word2'),
-         ('C', 'corpus2', 'word2')]))
-    p.run()
+    with TestPipeline() as p:
+
+      group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C'])
+      corpus_pcoll = p | 'CreateCorpus' >> beam.Create(
+          [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
+      words_pcoll = p | 'CreateWords' >> beam.Create(
+          [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}])
+      ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1'])
+      ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1'])
+
+      groups = bigquery_side_input.create_groups(group_ids_pcoll,
+                                                 corpus_pcoll,
+                                                 words_pcoll,
+                                                 ignore_corpus_pcoll,
+                                                 ignore_word_pcoll)
+
+      assert_that(groups, equal_to(
+          [('A', 'corpus2', 'word2'),
+           ('B', 'corpus2', 'word2'),
+           ('C', 'corpus2', 'word2')]))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index ed0c79a..d3b216e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -75,23 +75,22 @@ def run(argv=None):
        'or DATASET.TABLE.'))
   known_args, pipeline_args = parser.parse_known_args(argv)
 
-  p = beam.Pipeline(argv=pipeline_args)
-
-  # Read the table rows into a PCollection.
-  rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
-  counts = count_tornadoes(rows)
-
-  # Write the output using a "Write" transform that has side effects.
-  # pylint: disable=expression-not-assigned
-  counts | 'write' >> beam.io.Write(
-      beam.io.BigQuerySink(
-          known_args.output,
-          schema='month:INTEGER, tornado_count:INTEGER',
-          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-          write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
-
-  # Run the pipeline (all operations are deferred until run() is called).
-  p.run().wait_until_finish()
+  with beam.Pipeline(argv=pipeline_args) as p:
+
+    # Read the table rows into a PCollection.
+    rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
+    counts = count_tornadoes(rows)
+
+    # Write the output using a "Write" transform that has side effects.
+    # pylint: disable=expression-not-assigned
+    counts | 'write' >> beam.io.Write(
+        beam.io.BigQuerySink(
+            known_args.output,
+            schema='month:INTEGER, tornado_count:INTEGER',
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
+
+    # Run the pipeline (all operations are deferred until run() is called).
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index c926df8..45dcaba 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -30,16 +30,15 @@ from apache_beam.testing.util import equal_to
 class BigQueryTornadoesTest(unittest.TestCase):
 
   def test_basics(self):
-    p = TestPipeline()
-    rows = (p | 'create' >> beam.Create([
-        {'month': 1, 'day': 1, 'tornado': False},
-        {'month': 1, 'day': 2, 'tornado': True},
-        {'month': 1, 'day': 3, 'tornado': True},
-        {'month': 2, 'day': 1, 'tornado': True}]))
-    results = bigquery_tornadoes.count_tornadoes(rows)
-    assert_that(results, equal_to([{'month': 1, 'tornado_count': 2},
-                                   {'month': 2, 'tornado_count': 1}]))
-    p.run().wait_until_finish()
+    with TestPipeline() as p:
+      rows = (p | 'create' >> beam.Create([
+          {'month': 1, 'day': 1, 'tornado': False},
+          {'month': 1, 'day': 2, 'tornado': True},
+          {'month': 1, 'day': 3, 'tornado': True},
+          {'month': 2, 'day': 1, 'tornado': True}]))
+      results = bigquery_tornadoes.count_tornadoes(rows)
+      assert_that(results, equal_to([{'month': 1, 'tornado_count': 2},
+                                     {'month': 2, 'tornado_count': 1}]))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index aeeb3c9..f97b0f2 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -85,15 +85,13 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  p = beam.Pipeline(argv=pipeline_args)
-  (p  # pylint: disable=expression-not-assigned
-   | 'read' >> ReadFromText(known_args.input, coder=JsonCoder())
-   | 'points' >> beam.FlatMap(compute_points)
-   | beam.CombinePerKey(sum)
-   | 'write' >> WriteToText(known_args.output, coder=JsonCoder()))
-  p.run()
+
+  with beam.Pipeline(options=pipeline_options) as p:
+    (p  # pylint: disable=expression-not-assigned
+     | 'read' >> ReadFromText(known_args.input, coder=JsonCoder())
+     | 'points' >> beam.FlatMap(compute_points)
+     | beam.CombinePerKey(sum)
+     | 'write' >> WriteToText(known_args.output, coder=JsonCoder()))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index f71dad8..988d3c9 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -35,13 +35,13 @@ class CodersTest(unittest.TestCase):
       {'host': ['Brasil', 1], 'guest': ['Italy', 0]}]
 
   def test_compute_points(self):
-    p = TestPipeline()
-    records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
-    result = (records
-              | 'points' >> beam.FlatMap(coders.compute_points)
-              | beam.CombinePerKey(sum))
-    assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)]))
-    p.run()
+    with TestPipeline() as p:
+      records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
+      result = (records
+                | 'points' >> beam.FlatMap(coders.compute_points)
+                | beam.CombinePerKey(sum))
+      assert_that(result,
+                  equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)]))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 609f2cd..aee69d2 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -47,11 +47,10 @@ class Count1(beam.PTransform):
 def run_count1(known_args, options):
   """Runs the first example pipeline."""
   logging.info('Running first pipeline')
-  p = beam.Pipeline(options=options)
-  (p | beam.io.ReadFromText(known_args.input)
-   | Count1()
-   | beam.io.WriteToText(known_args.output))
-  p.run().wait_until_finish()
+  with beam.Pipeline(options=options) as p:
+    (p | beam.io.ReadFromText(known_args.input)
+     | Count1()
+     | beam.io.WriteToText(known_args.output))
 
 
 @beam.ptransform_fn
@@ -66,11 +65,10 @@ def Count2(pcoll):  # pylint: disable=invalid-name
 def run_count2(known_args, options):
   """Runs the second example pipeline."""
   logging.info('Running second pipeline')
-  p = beam.Pipeline(options=options)
-  (p | ReadFromText(known_args.input)
-   | Count2()  # pylint: disable=no-value-for-parameter
-   | WriteToText(known_args.output))
-  p.run().wait_until_finish()
+  with beam.Pipeline(options=options) as p:
+    (p | ReadFromText(known_args.input)
+     | Count2()  # pylint: disable=no-value-for-parameter
+     | WriteToText(known_args.output))
 
 
 @beam.ptransform_fn
@@ -93,11 +91,10 @@ def Count3(pcoll, factor=1):  # pylint: disable=invalid-name
 def run_count3(known_args, options):
   """Runs the third example pipeline."""
   logging.info('Running third pipeline')
-  p = beam.Pipeline(options=options)
-  (p | ReadFromText(known_args.input)
-   | Count3(2)  # pylint: disable=no-value-for-parameter
-   | WriteToText(known_args.output))
-  p.run()
+  with beam.Pipeline(options=options) as p:
+    (p | ReadFromText(known_args.input)
+     | Count3(2)  # pylint: disable=no-value-for-parameter
+     | WriteToText(known_args.output))
 
 
 def get_args(argv):

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index c7c6dba..7aaccb4 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,12 +40,11 @@ class CustomCountTest(unittest.TestCase):
     self.run_pipeline(custom_ptransform.Count3(factor), factor=factor)
 
   def run_pipeline(self, count_implementation, factor=1):
-    p = TestPipeline()
-    words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
-    result = words | count_implementation
-    assert_that(
-        result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
-    p.run()
+    with TestPipeline() as p:
+      words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+      result = words | count_implementation
+      assert_that(
+          result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 411feb8..7161cff 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -135,18 +135,15 @@ class EntityWrapper(object):
 
 def write_to_datastore(project, user_options, pipeline_options):
   """Creates a pipeline that writes entities to Cloud Datastore."""
-  p = beam.Pipeline(options=pipeline_options)
-
-  # pylint: disable=expression-not-assigned
-  (p
-   | 'read' >> ReadFromText(user_options.input)
-   | 'create entity' >> beam.Map(
-       EntityWrapper(user_options.namespace, user_options.kind,
-                     user_options.ancestor).make_entity)
-   | 'write to datastore' >> WriteToDatastore(project))
+  with beam.Pipeline(options=pipeline_options) as p:
 
-  # Actually run the pipeline (all operations above are deferred).
-  p.run().wait_until_finish()
+    # pylint: disable=expression-not-assigned
+    (p
+     | 'read' >> ReadFromText(user_options.input)
+     | 'create entity' >> beam.Map(
+         EntityWrapper(user_options.namespace, user_options.kind,
+                       user_options.ancestor).make_entity)
+     | 'write to datastore' >> WriteToDatastore(project))
 
 
 def make_ancestor_query(kind, namespace, ancestor):
@@ -196,7 +193,6 @@ def read_from_datastore(project, user_options, pipeline_options):
   output | 'write' >> beam.io.WriteToText(file_path_prefix=user_options.output,
                                           num_shards=user_options.num_shards)
 
-  # Actually run the pipeline (all operations above are deferred).
   result = p.run()
   # Wait until completion, main thread would access post-completion job results.
   result.wait_until_finish()

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index 374001c..1fbf763 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -86,20 +86,17 @@ def run(argv=None):
                       help='Numeric value of month to filter on.')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
-  p = beam.Pipeline(argv=pipeline_args)
+  with beam.Pipeline(argv=pipeline_args) as p:
 
-  input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input))
+    input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input))
 
-  # pylint: disable=expression-not-assigned
-  (filter_cold_days(input_data, known_args.month_filter)
-   | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink(
-       known_args.output,
-       schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
-       create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-       write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
-
-  # Actually run the pipeline (all operations above are deferred).
-  p.run()
+    # pylint: disable=expression-not-assigned
+    (filter_cold_days(input_data, known_args.month_filter)
+     | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink(
+         known_args.output,
+         schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
+         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+         write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 6bdadae..9c0d04b 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -95,28 +95,27 @@ def run(args=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  # Register the custom coder for the Player class, so that it will be used in
-  # the computation.
-  coders.registry.register_coder(Player, PlayerCoder)
-
-  (p  # pylint: disable=expression-not-assigned
-   | ReadFromText(known_args.input)
-   # The get_players function is annotated with a type hint above, so the type
-   # system knows the output type of the following operation is a key-value pair
-   # of a Player and an int. Please see the documentation for details on
-   # types that are inferred automatically as well as other ways to specify
-   # type hints.
-   | beam.Map(get_players)
-   # The output type hint of the previous step is used to infer that the key
-   # type of the following operation is the Player type. Since a custom coder
-   # is registered for the Player class above, a PlayerCoder will be used to
-   # encode Player objects as keys for this combine operation.
-   | beam.CombinePerKey(sum)
-   | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
-   | WriteToText(known_args.output))
-  return p.run()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    # Register the custom coder for the Player class, so that it will be used in
+    # the computation.
+    coders.registry.register_coder(Player, PlayerCoder)
+
+    (p  # pylint: disable=expression-not-assigned
+     | ReadFromText(known_args.input)
+     # The get_players function is annotated with a type hint above, so the type
+     # system knows the output type of the following operation is a key-value
+     # pair of a Player and an int. Please see the documentation for details on
+     # types that are inferred automatically as well as other ways to specify
+     # type hints.
+     | beam.Map(get_players)
+     # The output type hint of the previous step is used to infer that the key
+     # type of the following operation is the Player type. Since a custom coder
+     # is registered for the Player class above, a PlayerCoder will be used to
+     # encode Player objects as keys for this combine operation.
+     | beam.CombinePerKey(sum)
+     | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
+     | WriteToText(known_args.output))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index 4e87966..268ba8d 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -50,7 +50,7 @@ class GroupWithCoderTest(unittest.TestCase):
     temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
     group_with_coder.run([
         '--input=%s*' % temp_path,
-        '--output=%s.result' % temp_path]).wait_until_finish()
+        '--output=%s.result' % temp_path])
     # Parse result file and compare.
     results = []
     with open(temp_path + '.result-00000-of-00001') as result_file:
@@ -71,7 +71,7 @@ class GroupWithCoderTest(unittest.TestCase):
     group_with_coder.run([
         '--no_pipeline_type_check',
         '--input=%s*' % temp_path,
-        '--output=%s.result' % temp_path]).wait_until_finish()
+        '--output=%s.result' % temp_path])
     # Parse result file and compare.
     results = []
     with open(temp_path + '.result-00000-of-00001') as result_file:

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 4f53c61..9acdd90 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -70,64 +70,63 @@ def run(argv=None, assert_results=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  # Helper: read a tab-separated key-value mapping from a text file, escape all
-  # quotes/backslashes, and convert it a PCollection of (key, value) pairs.
-  def read_kv_textfile(label, textfile):
-    return (p
-            | 'Read: %s' % label >> ReadFromText(textfile)
-            | 'Backslash: %s' % label >> beam.Map(
-                lambda x: re.sub(r'\\', r'\\\\', x))
-            | 'EscapeQuotes: %s' % label >> beam.Map(
-                lambda x: re.sub(r'"', r'\"', x))
-            | 'Split: %s' % label >> beam.Map(
-                lambda x: re.split(r'\t+', x, 1)))
-
-  # Read input databases.
-  email = read_kv_textfile('email', known_args.input_email)
-  phone = read_kv_textfile('phone', known_args.input_phone)
-  snailmail = read_kv_textfile('snailmail', known_args.input_snailmail)
-
-  # Group together all entries under the same name.
-  grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
-
-  # Prepare tab-delimited output; something like this:
-  # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
-  tsv_lines = grouped | beam.Map(
-      lambda (name, (email, phone, snailmail)): '\t'.join(
-          ['"%s"' % name,
-           '"%s"' % ','.join(email),
-           '"%s"' % ','.join(phone),
-           '"%s"' % next(iter(snailmail), '')]))
-
-  # Compute some stats about our database of people.
-  luddites = grouped | beam.Filter(  # People without email.
-      lambda (name, (email, phone, snailmail)): not next(iter(email), None))
-  writers = grouped | beam.Filter(   # People without phones.
-      lambda (name, (email, phone, snailmail)): not next(iter(phone), None))
-  nomads = grouped | beam.Filter(    # People without addresses.
-      lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
-
-  num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
-  num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
-  num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally()
-
-  # Write tab-delimited output.
-  # pylint: disable=expression-not-assigned
-  tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv)
-
-  # TODO(silviuc): Move the assert_results logic to the unit test.
-  if assert_results is not None:
-    expected_luddites, expected_writers, expected_nomads = assert_results
-    assert_that(num_luddites, equal_to([expected_luddites]),
-                label='assert:luddites')
-    assert_that(num_writers, equal_to([expected_writers]),
-                label='assert:writers')
-    assert_that(num_nomads, equal_to([expected_nomads]),
-                label='assert:nomads')
-  # Execute pipeline.
-  return p.run()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    # Helper: read a tab-separated key-value mapping from a text file,
+    # escape all quotes/backslashes, and convert it a PCollection of
+    # (key, value) pairs.
+    def read_kv_textfile(label, textfile):
+      return (p
+              | 'Read: %s' % label >> ReadFromText(textfile)
+              | 'Backslash: %s' % label >> beam.Map(
+                  lambda x: re.sub(r'\\', r'\\\\', x))
+              | 'EscapeQuotes: %s' % label >> beam.Map(
+                  lambda x: re.sub(r'"', r'\"', x))
+              | 'Split: %s' % label >> beam.Map(
+                  lambda x: re.split(r'\t+', x, 1)))
+
+    # Read input databases.
+    email = read_kv_textfile('email', known_args.input_email)
+    phone = read_kv_textfile('phone', known_args.input_phone)
+    snailmail = read_kv_textfile('snailmail', known_args.input_snailmail)
+
+    # Group together all entries under the same name.
+    grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
+
+    # Prepare tab-delimited output; something like this:
+    # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
+    tsv_lines = grouped | beam.Map(
+        lambda (name, (email, phone, snailmail)): '\t'.join(
+            ['"%s"' % name,
+             '"%s"' % ','.join(email),
+             '"%s"' % ','.join(phone),
+             '"%s"' % next(iter(snailmail), '')]))
+
+    # Compute some stats about our database of people.
+    luddites = grouped | beam.Filter(  # People without email.
+        lambda (name, (email, phone, snailmail)): not next(iter(email), None))
+    writers = grouped | beam.Filter(   # People without phones.
+        lambda (name, (email, phone, snailmail)): not next(iter(phone), None))
+    nomads = grouped | beam.Filter(    # People without addresses.
+        lambda (name, (e, p, snailmail)): not next(iter(snailmail), None))
+
+    num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
+    num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
+    num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally()
+
+    # Write tab-delimited output.
+    # pylint: disable=expression-not-assigned
+    tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv)
+
+    # TODO(silviuc): Move the assert_results logic to the unit test.
+    if assert_results is not None:
+      expected_luddites, expected_writers, expected_nomads = assert_results
+      assert_that(num_luddites, equal_to([expected_luddites]),
+                  label='assert:luddites')
+      assert_that(num_writers, equal_to([expected_writers]),
+                  label='assert:writers')
+      assert_that(num_nomads, equal_to([expected_nomads]),
+                  label='assert:nomads')
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
index 09f71d3..b3be0dd 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
@@ -107,13 +107,12 @@ class MergeContactsTest(unittest.TestCase):
 
     result_prefix = self.create_temp_file('')
 
-    result = mergecontacts.run([
+    mergecontacts.run([
         '--input_email=%s' % path_email,
         '--input_phone=%s' % path_phone,
         '--input_snailmail=%s' % path_snailmail,
         '--output_tsv=%s.tsv' % result_prefix,
         '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3))
-    result.wait_until_finish()
 
     with open('%s.tsv-00000-of-00001' % result_prefix) as f:
       contents = f.read()

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 9759f48..2316c66 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -141,43 +141,41 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
-  lines = p | ReadFromText(known_args.input)
-
-  # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-  split_lines_result = (lines
-                        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-                            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-                            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-                            main='words'))
-
-  # split_lines_result is an object of type DoOutputsTuple. It supports
-  # accessing result in alternative ways.
-  words, _, _ = split_lines_result
-  short_words = split_lines_result[
-      SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-  character_count = split_lines_result.tag_character_count
-
-  # pylint: disable=expression-not-assigned
-  (character_count
-   | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-   | beam.GroupByKey()
-   | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
-   | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-  # pylint: disable=expression-not-assigned
-  (short_words
-   | 'count short words' >> CountWords()
-   | 'write short words' >> WriteToText(
-       known_args.output + '-short-words'))
-
-  # pylint: disable=expression-not-assigned
-  (words
-   | 'count words' >> CountWords()
-   | 'write words' >> WriteToText(known_args.output + '-words'))
-
-  return p.run()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    lines = p | ReadFromText(known_args.input)
+
+    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+    split_lines_result = (lines
+                          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+                              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+                              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+                              main='words'))
+
+    # split_lines_result is an object of type DoOutputsTuple. It supports
+    # accessing result in alternative ways.
+    words, _, _ = split_lines_result
+    short_words = split_lines_result[
+        SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+    character_count = split_lines_result.tag_character_count
+
+    # pylint: disable=expression-not-assigned
+    (character_count
+     | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+     | beam.GroupByKey()
+     | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
+     | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+    # pylint: disable=expression-not-assigned
+    (short_words
+     | 'count short words' >> CountWords()
+     | 'write short words' >> WriteToText(
+         known_args.output + '-short-words'))
+
+    # pylint: disable=expression-not-assigned
+    (words
+     | 'count words' >> CountWords()
+     | 'write words' >> WriteToText(known_args.output + '-words'))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
index 2c9111c..3ddd668 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
@@ -52,7 +52,7 @@ class MultipleOutputParDo(unittest.TestCase):
 
     multiple_output_pardo.run([
         '--input=%s*' % temp_path,
-        '--output=%s' % result_prefix]).wait_until_finish()
+        '--output=%s' % result_prefix])
 
     expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n')))
     with open(result_prefix + '-chars-00000-of-00001') as f:

http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 7259572..70929e9 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -147,18 +147,15 @@ def model_pipelines(argv):
   pipeline_options = PipelineOptions(argv)
   my_options = pipeline_options.view_as(MyOptions)
 
-  p = beam.Pipeline(options=pipeline_options)
-
-  (p
-   | beam.io.ReadFromText(my_options.input)
-   | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-   | beam.Map(lambda x: (x, 1))
-   | beam.combiners.Count.PerKey()
-   | beam.io.WriteToText(my_options.output))
-
-  result = p.run()
+  with beam.Pipeline(options=pipeline_options) as p:
+
+    (p
+     | beam.io.ReadFromText(my_options.input)
+     | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+     | beam.Map(lambda x: (x, 1))
+     | beam.combiners.Count.PerKey()
+     | beam.io.WriteToText(my_options.output))
   # [END model_pipelines]
-  result.wait_until_finish()
 
 
 def model_pcollection(argv):
@@ -178,21 +175,18 @@ def model_pcollection(argv):
   my_options = pipeline_options.view_as(MyOptions)
 
   # [START model_pcollection]
-  p = beam.Pipeline(options=pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
 
-  lines = (p
-           | beam.Create([
-               'To be, or not to be: that is the question: ',
-               'Whether \'tis nobler in the mind to suffer ',
-               'The slings and arrows of outrageous fortune, ',
-               'Or to take arms against a sea of troubles, ']))
-  # [END model_pcollection]
+    lines = (p
+             | beam.Create([
+                 'To be, or not to be: that is the question: ',
+                 'Whether \'tis nobler in the mind to suffer ',
+                 'The slings and arrows of outrageous fortune, ',
+                 'Or to take arms against a sea of troubles, ']))
+    # [END model_pcollection]
 
-  (lines
-   | beam.io.WriteToText(my_options.output))
-
-  result = p.run()
-  result.wait_until_finish()
+    (lines
+     | beam.io.WriteToText(my_options.output))
 
 
 def pipeline_options_remote(argv):
@@ -297,12 +291,10 @@ def pipeline_options_command_line(argv):
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   # Create the Pipeline with remaining arguments.
-  p = beam.Pipeline(argv=pipeline_args)
-  lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
-  lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
-  # [END pipeline_options_command_line]
-
-  p.run().wait_until_finish()
+  with beam.Pipeline(argv=pipeline_args) as p:
+    lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
+    lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
+    # [END pipeline_options_command_line]
 
 
 def pipeline_logging(lines, output):
@@ -329,13 +321,11 @@ def pipeline_logging(lines, output):
   # Remaining WordCount example code ...
   # [END pipeline_logging]
 
-  p = TestPipeline()  # Use TestPipeline for testing.
-  (p
-   | beam.Create(lines)
-   | beam.ParDo(ExtractWordsFn())
-   | beam.io.WriteToText(output))
-
-  p.run()
+  with TestPipeline() as p:  # Use TestPipeline for testing.
+    (p
+     | beam.Create(lines)
+     | beam.ParDo(ExtractWordsFn())
+     | beam.io.WriteToText(output))
 
 
 def pipeline_monitoring(renames):
@@ -385,20 +375,19 @@ def pipeline_monitoring(renames):
 
   pipeline_options = PipelineOptions()
   options = pipeline_options.view_as(WordCountOptions)
-  p = TestPipeline()  # Use TestPipeline for testing.
+  with TestPipeline() as p:  # Use TestPipeline for testing.
 
-  # [START pipeline_monitoring_execution]
-  (p
-   # Read the lines of the input text.
-   | 'ReadLines' >> beam.io.ReadFromText(options.input)
-   # Count the words.
-   | CountWords()
-   # Write the formatted word counts to output.
-   | 'WriteCounts' >> beam.io.WriteToText(options.output))
-  # [END pipeline_monitoring_execution]
+    # [START pipeline_monitoring_execution]
+    (p
+     # Read the lines of the input text.
+     | 'ReadLines' >> beam.io.ReadFromText(options.input)
+     # Count the words.
+     | CountWords()
+     # Write the formatted word counts to output.
+     | 'WriteCounts' >> beam.io.WriteToText(options.output))
+    # [END pipeline_monitoring_execution]
 
-  p.visit(SnippetUtils.RenameFiles(renames))
-  p.run()
+    p.visit(SnippetUtils.RenameFiles(renames))
 
 
 def examples_wordcount_minimal(renames):
@@ -478,40 +467,39 @@ def examples_wordcount_wordcount(renames):
                           default='gs://my-bucket/input')
 
   options = PipelineOptions(argv)
-  p = beam.Pipeline(options=options)
-  # [END examples_wordcount_wordcount_options]
+  with beam.Pipeline(options=options) as p:
+    # [END examples_wordcount_wordcount_options]
 
-  lines = p | beam.io.ReadFromText(
-      'gs://dataflow-samples/shakespeare/kinglear.txt')
+    lines = p | beam.io.ReadFromText(
+        'gs://dataflow-samples/shakespeare/kinglear.txt')
 
-  # [START examples_wordcount_wordcount_composite]
-  class CountWords(beam.PTransform):
+    # [START examples_wordcount_wordcount_composite]
+    class CountWords(beam.PTransform):
 
-    def expand(self, pcoll):
-      return (pcoll
-              # Convert lines of text into individual words.
-              | 'ExtractWords' >> beam.FlatMap(
-                  lambda x: re.findall(r'[A-Za-z\']+', x))
+      def expand(self, pcoll):
+        return (pcoll
+                # Convert lines of text into individual words.
+                | 'ExtractWords' >> beam.FlatMap(
+                    lambda x: re.findall(r'[A-Za-z\']+', x))
 
-              # Count the number of times each word occurs.
-              | beam.combiners.Count.PerElement())
+                # Count the number of times each word occurs.
+                | beam.combiners.Count.PerElement())
 
-  counts = lines | CountWords()
-  # [END examples_wordcount_wordcount_composite]
+    counts = lines | CountWords()
+    # [END examples_wordcount_wordcount_composite]
 
-  # [START examples_wordcount_wordcount_dofn]
-  class FormatAsTextFn(beam.DoFn):
+    # [START examples_wordcount_wordcount_dofn]
+    class FormatAsTextFn(beam.DoFn):
 
-    def process(self, element):
-      word, count = element
-      yield '%s: %s' % (word, count)
+      def process(self, element):
+        word, count = element
+        yield '%s: %s' % (word, count)
 
-  formatted = counts | beam.ParDo(FormatAsTextFn())
-  # [END examples_wordcount_wordcount_dofn]
+    formatted = counts | beam.ParDo(FormatAsTextFn())
+    # [END examples_wordcount_wordcount_dofn]
 
-  formatted |  beam.io.WriteToText('gs://my-bucket/counts.txt')
-  p.visit(SnippetUtils.RenameFiles(renames))
-  p.run().wait_until_finish()
+    formatted |  beam.io.WriteToText('gs://my-bucket/counts.txt')
+    p.visit(SnippetUtils.RenameFiles(renames))
 
 
 def examples_wordcount_debugging(renames):
@@ -558,27 +546,27 @@ def examples_wordcount_debugging(renames):
   # [END example_wordcount_debugging_logging]
   # [END example_wordcount_debugging_aggregators]
 
-  p = TestPipeline()  # Use TestPipeline for testing.
-  filtered_words = (
-      p
-      | beam.io.ReadFromText(
-          'gs://dataflow-samples/shakespeare/kinglear.txt')
-      | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-      | beam.combiners.Count.PerElement()
-      | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
+  with TestPipeline() as p:  # Use TestPipeline for testing.
+    filtered_words = (
+        p
+        | beam.io.ReadFromText(
+            'gs://dataflow-samples/shakespeare/kinglear.txt')
+        | 'ExtractWords' >> beam.FlatMap(
+            lambda x: re.findall(r'[A-Za-z\']+', x))
+        | beam.combiners.Count.PerElement()
+        | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
 
-  # [START example_wordcount_debugging_assert]
-  beam.testing.util.assert_that(
-      filtered_words, beam.testing.util.equal_to(
-          [('Flourish', 3), ('stomach', 1)]))
-  # [END example_wordcount_debugging_assert]
+    # [START example_wordcount_debugging_assert]
+    beam.testing.util.assert_that(
+        filtered_words, beam.testing.util.equal_to(
+            [('Flourish', 3), ('stomach', 1)]))
+    # [END example_wordcount_debugging_assert]
 
-  output = (filtered_words
-            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
-            | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
+    output = (filtered_words
+              | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+              | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
 
-  p.visit(SnippetUtils.RenameFiles(renames))
-  p.run()
+    p.visit(SnippetUtils.RenameFiles(renames))
 
 
 import apache_beam as beam
@@ -659,16 +647,14 @@ def model_custom_source(count):
 
   # Using the source in an example pipeline.
   # [START model_custom_source_use_new_source]
-  p = beam.Pipeline(options=PipelineOptions())
-  numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
-  # [END model_custom_source_use_new_source]
+  with beam.Pipeline(options=PipelineOptions()) as p:
+    numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
+    # [END model_custom_source_use_new_source]
 
-  lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
-  assert_that(
-      lines, equal_to(
-          ['line ' + str(number) for number in range(0, count)]))
-
-  p.run().wait_until_finish()
+    lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
+    assert_that(
+        lines, equal_to(
+            ['line ' + str(number) for number in range(0, count)]))
 
   # We recommend users to start Source classes with an underscore to discourage
   # using the Source class directly when a PTransform for the source is
@@ -796,14 +782,12 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
 
   # Using the new sink in an example pipeline.
   # [START model_custom_sink_use_new_sink]
-  p = beam.Pipeline(options=PipelineOptions())
-  kvs = p | 'CreateKVs' >> beam.Create(KVs)
+  with beam.Pipeline(options=PipelineOptions()) as p:
+    kvs = p | 'CreateKVs' >> beam.Create(KVs)
 
-  kvs | 'WriteToSimpleKV' >> beam.io.Write(
-      SimpleKVSink('http://url_to_simple_kv/', final_table_name))
-  # [END model_custom_sink_use_new_sink]
-
-  p.run().wait_until_finish()
+    kvs | 'WriteToSimpleKV' >> beam.io.Write(
+        SimpleKVSink('http://url_to_simple_kv/', final_table_name))
+    # [END model_custom_sink_use_new_sink]
 
   # We recommend users to start Sink class names with an underscore to
   # discourage using the Sink class directly when a PTransform for the sink is
@@ -828,13 +812,11 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
   final_table_name = final_table_name_with_ptransform
 
   # [START model_custom_sink_use_ptransform]
-  p = beam.Pipeline(options=PipelineOptions())
-  kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
-  kvs | 'WriteToSimpleKV' >> WriteToKVSink(
-      'http://url_to_simple_kv/', final_table_name)
-  # [END model_custom_sink_use_ptransform]
-
-  p.run().wait_until_finish()
+  with beam.Pipeline(options=PipelineOptions()) as p:
+    kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
+    kvs | 'WriteToSimpleKV' >> WriteToKVSink(
+        'http://url_to_simple_kv/', final_table_name)
+    # [END model_custom_sink_use_ptransform]
 
 
 def model_textio(renames):
@@ -847,37 +829,35 @@ def model_textio(renames):
   from apache_beam.options.pipeline_options import PipelineOptions
 
   # [START model_textio_read]
-  p = beam.Pipeline(options=PipelineOptions())
-  # [START model_pipelineio_read]
-  lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')
-  # [END model_pipelineio_read]
-  # [END model_textio_read]
-
-  # [START model_textio_write]
-  filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
-  # [START model_pipelineio_write]
-  filtered_words | 'WriteToText' >> beam.io.WriteToText(
-      '/path/to/numbers', file_name_suffix='.csv')
-  # [END model_pipelineio_write]
-  # [END model_textio_write]
+  with beam.Pipeline(options=PipelineOptions()) as p:
+    # [START model_pipelineio_read]
+    lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')
+    # [END model_pipelineio_read]
+    # [END model_textio_read]
 
-  p.visit(SnippetUtils.RenameFiles(renames))
-  p.run().wait_until_finish()
+    # [START model_textio_write]
+    filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
+    # [START model_pipelineio_write]
+    filtered_words | 'WriteToText' >> beam.io.WriteToText(
+        '/path/to/numbers', file_name_suffix='.csv')
+    # [END model_pipelineio_write]
+    # [END model_textio_write]
+
+    p.visit(SnippetUtils.RenameFiles(renames))
 
 
 def model_textio_compressed(renames, expected):
   """Using a Read Transform to read compressed text files."""
-  p = TestPipeline()
+  with TestPipeline() as p:
 
-  # [START model_textio_write_compressed]
-  lines = p | 'ReadFromText' >> beam.io.ReadFromText(
-      '/path/to/input-*.csv.gz',
-      compression_type=beam.io.filesystem.CompressionTypes.GZIP)
-  # [END model_textio_write_compressed]
+    # [START model_textio_write_compressed]
+    lines = p | 'ReadFromText' >> beam.io.ReadFromText(
+        '/path/to/input-*.csv.gz',
+        compression_type=beam.io.filesystem.CompressionTypes.GZIP)
+    # [END model_textio_write_compressed]
 
-  assert_that(lines, equal_to(expected))
-  p.visit(SnippetUtils.RenameFiles(renames))
-  p.run().wait_until_finish()
+    assert_that(lines, equal_to(expected))
+    p.visit(SnippetUtils.RenameFiles(renames))
 
 
 def model_datastoreio():
@@ -987,43 +967,40 @@ def model_composite_transform_example(contents, output_path):
   # [END composite_ptransform_apply_method]
   # [END composite_transform_example]
 
-  p = TestPipeline()  # Use TestPipeline for testing.
-  (p
-   | beam.Create(contents)
-   | CountWords()
-   | beam.io.WriteToText(output_path))
-  p.run()
+  with TestPipeline() as p:  # Use TestPipeline for testing.
+    (p
+     | beam.Create(contents)
+     | CountWords()
+     | beam.io.WriteToText(output_path))
 
 
 def model_multiple_pcollections_flatten(contents, output_path):
   """Merging a PCollection with Flatten."""
   some_hash_fn = lambda s: ord(s[0])
-  import apache_beam as beam
-  p = TestPipeline()  # Use TestPipeline for testing.
   partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
-
-  # Partition into deciles
-  partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3)
-  pcoll1 = partitioned[0]
-  pcoll2 = partitioned[1]
-  pcoll3 = partitioned[2]
-
-  # Flatten them back into 1
-
-  # A collection of PCollection objects can be represented simply
-  # as a tuple (or list) of PCollections.
-  # (The SDK for Python has no separate type to store multiple
-  # PCollection objects, whether containing the same or different
-  # types.)
-  # [START model_multiple_pcollections_flatten]
-  merged = (
-      (pcoll1, pcoll2, pcoll3)
-      # A list of tuples can be "piped" directly into a Flatten transform.
-      | beam.Flatten())
-  # [END model_multiple_pcollections_flatten]
-  merged | beam.io.WriteToText(output_path)
-
-  p.run()
+  import apache_beam as beam
+  with TestPipeline() as p:  # Use TestPipeline for testing.
+
+    # Partition into deciles
+    partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3)
+    pcoll1 = partitioned[0]
+    pcoll2 = partitioned[1]
+    pcoll3 = partitioned[2]
+
+    # Flatten them back into 1
+
+    # A collection of PCollection objects can be represented simply
+    # as a tuple (or list) of PCollections.
+    # (The SDK for Python has no separate type to store multiple
+    # PCollection objects, whether containing the same or different
+    # types.)
+    # [START model_multiple_pcollections_flatten]
+    merged = (
+        (pcoll1, pcoll2, pcoll3)
+        # A list of tuples can be "piped" directly into a Flatten transform.
+        | beam.Flatten())
+    # [END model_multiple_pcollections_flatten]
+    merged | beam.io.WriteToText(output_path)
 
 
 def model_multiple_pcollections_partition(contents, output_path):
@@ -1034,25 +1011,23 @@ def model_multiple_pcollections_partition(contents, output_path):
     """Assume i in [0,100)."""
     return i
   import apache_beam as beam
-  p = TestPipeline()  # Use TestPipeline for testing.
+  with TestPipeline() as p:  # Use TestPipeline for testing.
 
-  students = p | beam.Create(contents)
+    students = p | beam.Create(contents)
 
-  # [START model_multiple_pcollections_partition]
-  def partition_fn(student, num_partitions):
-    return int(get_percentile(student) * num_partitions / 100)
+    # [START model_multiple_pcollections_partition]
+    def partition_fn(student, num_partitions):
+      return int(get_percentile(student) * num_partitions / 100)
 
-  by_decile = students | beam.Partition(partition_fn, 10)
-  # [END model_multiple_pcollections_partition]
-  # [START model_multiple_pcollections_partition_40th]
-  fortieth_percentile = by_decile[4]
-  # [END model_multiple_pcollections_partition_40th]
+    by_decile = students | beam.Partition(partition_fn, 10)
+    # [END model_multiple_pcollections_partition]
+    # [START model_multiple_pcollections_partition_40th]
+    fortieth_percentile = by_decile[4]
+    # [END model_multiple_pcollections_partition_40th]
 
-  ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
-   | beam.Flatten()
-   | beam.io.WriteToText(output_path))
-
-  p.run()
+    ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
+     | beam.Flatten()
+     | beam.io.WriteToText(output_path))
 
 
 def model_group_by_key(contents, output_path):
@@ -1060,58 +1035,56 @@ def model_group_by_key(contents, output_path):
   import re
 
   import apache_beam as beam
-  p = TestPipeline()  # Use TestPipeline for testing.
-  words_and_counts = (
-      p
-      | beam.Create(contents)
-      | beam.FlatMap(lambda x: re.findall(r'\w+', x))
-      | 'one word' >> beam.Map(lambda w: (w, 1)))
-  # GroupByKey accepts a PCollection of (w, 1) and
-  # outputs a PCollection of (w, (1, 1, ...)).
-  # (A key/value pair is just a tuple in Python.)
-  # This is a somewhat forced example, since one could
-  # simply use beam.combiners.Count.PerElement here.
-  # [START model_group_by_key_transform]
-  grouped_words = words_and_counts | beam.GroupByKey()
-  # [END model_group_by_key_transform]
-  (grouped_words
-   | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
-   | beam.io.WriteToText(output_path))
-  p.run()
+  with TestPipeline() as p:  # Use TestPipeline for testing.
+    words_and_counts = (
+        p
+        | beam.Create(contents)
+        | beam.FlatMap(lambda x: re.findall(r'\w+', x))
+        | 'one word' >> beam.Map(lambda w: (w, 1)))
+    # GroupByKey accepts a PCollection of (w, 1) and
+    # outputs a PCollection of (w, (1, 1, ...)).
+    # (A key/value pair is just a tuple in Python.)
+    # This is a somewhat forced example, since one could
+    # simply use beam.combiners.Count.PerElement here.
+    # [START model_group_by_key_transform]
+    grouped_words = words_and_counts | beam.GroupByKey()
+    # [END model_group_by_key_transform]
+    (grouped_words
+     | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
+     | beam.io.WriteToText(output_path))
 
 
 def model_co_group_by_key_tuple(email_list, phone_list, output_path):
   """Applying a CoGroupByKey Transform to a tuple."""
   import apache_beam as beam
-  p = TestPipeline()  # Use TestPipeline for testing.
-  # [START model_group_by_key_cogroupbykey_tuple]
-  # Each data set is represented by key-value pairs in separate PCollections.
-  # Both data sets share a common key type (in this example str).
-  # The email_list contains values such as: ('joe', 'joe@example.com') with
-  # multiple possible values for each key.
-  # The phone_list contains values such as: ('mary': '111-222-3333') with
-  # multiple possible values for each key.
-  emails = p | 'email' >> beam.Create(email_list)
-  phones = p | 'phone' >> beam.Create(phone_list)
-  # The result PCollection contains one key-value element for each key in the
-  # input PCollections. The key of the pair will be the key from the input and
-  # the value will be a dictionary with two entries: 'emails' - an iterable of
-  # all values for the current key in the emails PCollection and 'phones': an
-  # iterable of all values for the current key in the phones PCollection.
-  # For instance, if 'emails' contained ('joe', 'joe@example.com') and
-  # ('joe', 'joe@gmail.com'), then 'result' will contain the element
-  # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...})
-  result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey()
-
-  def join_info((name, info)):
-    return '; '.join(['%s' % name,
-                      '%s' % ','.join(info['emails']),
-                      '%s' % ','.join(info['phones'])])
-
-  contact_lines = result | beam.Map(join_info)
-  # [END model_group_by_key_cogroupbykey_tuple]
-  contact_lines | beam.io.WriteToText(output_path)
-  p.run()
+  with TestPipeline() as p:  # Use TestPipeline for testing.
+    # [START model_group_by_key_cogroupbykey_tuple]
+    # Each data set is represented by key-value pairs in separate PCollections.
+    # Both data sets share a common key type (in this example str).
+    # The email_list contains values such as: ('joe', 'joe@example.com') with
+    # multiple possible values for each key.
+    # The phone_list contains values such as: ('mary': '111-222-3333') with
+    # multiple possible values for each key.
+    emails = p | 'email' >> beam.Create(email_list)
+    phones = p | 'phone' >> beam.Create(phone_list)
+    # The result PCollection contains one key-value element for each key in the
+    # input PCollections. The key of the pair will be the key from the input and
+    # the value will be a dictionary with two entries: 'emails' - an iterable of
+    # all values for the current key in the emails PCollection and 'phones': an
+    # iterable of all values for the current key in the phones PCollection.
+    # For instance, if 'emails' contained ('joe', 'joe@example.com') and
+    # ('joe', 'joe@gmail.com'), then 'result' will contain the element
+    # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...})
+    result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey()
+
+    def join_info((name, info)):
+      return '; '.join(['%s' % name,
+                        '%s' % ','.join(info['emails']),
+                        '%s' % ','.join(info['phones'])])
+
+    contact_lines = result | beam.Map(join_info)
+    # [END model_group_by_key_cogroupbykey_tuple]
+    contact_lines | beam.io.WriteToText(output_path)
 
 
 def model_join_using_side_inputs(
@@ -1121,35 +1094,34 @@ def model_join_using_side_inputs(
   import apache_beam as beam
   from apache_beam.pvalue import AsIter
 
-  p = TestPipeline()  # Use TestPipeline for testing.
-  # [START model_join_using_side_inputs]
-  # This code performs a join by receiving the set of names as an input and
-  # passing PCollections that contain emails and phone numbers as side inputs
-  # instead of using CoGroupByKey.
-  names = p | 'names' >> beam.Create(name_list)
-  emails = p | 'email' >> beam.Create(email_list)
-  phones = p | 'phone' >> beam.Create(phone_list)
-
-  def join_info(name, emails, phone_numbers):
-    filtered_emails = []
-    for name_in_list, email in emails:
-      if name_in_list == name:
-        filtered_emails.append(email)
-
-    filtered_phone_numbers = []
-    for name_in_list, phone_number in phone_numbers:
-      if name_in_list == name:
-        filtered_phone_numbers.append(phone_number)
-
-    return '; '.join(['%s' % name,
-                      '%s' % ','.join(filtered_emails),
-                      '%s' % ','.join(filtered_phone_numbers)])
-
-  contact_lines = names | 'CreateContacts' >> beam.core.Map(
-      join_info, AsIter(emails), AsIter(phones))
-  # [END model_join_using_side_inputs]
-  contact_lines | beam.io.WriteToText(output_path)
-  p.run()
+  with TestPipeline() as p:  # Use TestPipeline for testing.
+    # [START model_join_using_side_inputs]
+    # This code performs a join by receiving the set of names as an input and
+    # passing PCollections that contain emails and phone numbers as side inputs
+    # instead of using CoGroupByKey.
+    names = p | 'names' >> beam.Create(name_list)
+    emails = p | 'email' >> beam.Create(email_list)
+    phones = p | 'phone' >> beam.Create(phone_list)
+
+    def join_info(name, emails, phone_numbers):
+      filtered_emails = []
+      for name_in_list, email in emails:
+        if name_in_list == name:
+          filtered_emails.append(email)
+
+      filtered_phone_numbers = []
+      for name_in_list, phone_number in phone_numbers:
+        if name_in_list == name:
+          filtered_phone_numbers.append(phone_number)
+
+      return '; '.join(['%s' % name,
+                        '%s' % ','.join(filtered_emails),
+                        '%s' % ','.join(filtered_phone_numbers)])
+
+    contact_lines = names | 'CreateContacts' >> beam.core.Map(
+        join_info, AsIter(emails), AsIter(phones))
+    # [END model_join_using_side_inputs]
+    contact_lines | beam.io.WriteToText(output_path)
 
 
 # [START model_library_transforms_keys]


[3/3] beam git commit: Closes #3180

Posted by ro...@apache.org.
Closes #3180


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/474345f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/474345f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/474345f5

Branch: refs/heads/master
Commit: 474345f5987e47a22d063c7bfcb3638c85a57e64
Parents: d0601b3 3a62b4f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue May 23 10:19:22 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 23 10:19:22 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete.py           |  19 +-
 .../examples/complete/autocomplete_test.py      |  31 +-
 .../examples/complete/estimate_pi.py            |  11 +-
 .../examples/complete/estimate_pi_test.py       |  12 +-
 .../examples/complete/game/hourly_team_score.py |  19 +-
 .../examples/complete/game/user_score.py        |  15 +-
 .../complete/juliaset/juliaset/juliaset.py      |  44 +-
 .../apache_beam/examples/complete/tfidf.py      |  21 +-
 .../apache_beam/examples/complete/tfidf_test.py |  28 +-
 .../examples/complete/top_wikipedia_sessions.py |  12 +-
 .../complete/top_wikipedia_sessions_test.py     |   9 +-
 .../examples/cookbook/bigquery_schema.py        | 159 +++---
 .../examples/cookbook/bigquery_side_input.py    |  51 +-
 .../cookbook/bigquery_side_input_test.py        |  39 +-
 .../examples/cookbook/bigquery_tornadoes.py     |  33 +-
 .../cookbook/bigquery_tornadoes_test.py         |  19 +-
 .../apache_beam/examples/cookbook/coders.py     |  16 +-
 .../examples/cookbook/coders_test.py            |  14 +-
 .../examples/cookbook/custom_ptransform.py      |  27 +-
 .../examples/cookbook/custom_ptransform_test.py |  11 +-
 .../examples/cookbook/datastore_wordcount.py    |  20 +-
 .../apache_beam/examples/cookbook/filters.py    |  21 +-
 .../examples/cookbook/group_with_coder.py       |  43 +-
 .../examples/cookbook/group_with_coder_test.py  |   4 +-
 .../examples/cookbook/mergecontacts.py          | 115 +++--
 .../examples/cookbook/mergecontacts_test.py     |   3 +-
 .../examples/cookbook/multiple_output_pardo.py  |  72 ++-
 .../cookbook/multiple_output_pardo_test.py      |   2 +-
 .../apache_beam/examples/snippets/snippets.py   | 494 +++++++++----------
 .../examples/snippets/snippets_test.py          | 326 ++++++------
 .../apache_beam/examples/streaming_wordcap.py   |  24 +-
 .../apache_beam/examples/streaming_wordcount.py |  44 +-
 sdks/python/apache_beam/examples/wordcount.py   |   1 -
 .../apache_beam/examples/wordcount_debugging.py |  55 +--
 .../apache_beam/examples/wordcount_minimal.py   |  33 +-
 .../python/apache_beam/io/filebasedsink_test.py |  16 +-
 sdks/python/apache_beam/pipeline.py             |  19 +-
 .../apache_beam/transforms/combiners_test.py    |  58 +--
 .../apache_beam/transforms/window_test.py       | 147 +++---
 .../transforms/write_ptransform_test.py         |   7 +-
 .../typehints/typed_pipeline_test.py            |  22 +-
 41 files changed, 1005 insertions(+), 1111 deletions(-)
----------------------------------------------------------------------