You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:47:07 UTC

[01/12] incubator-beam git commit: Fix error messages for externally named PTransforms.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 9fe102a5c -> 38d9dea2e


Fix error messages for externally named PTransforms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c186ce4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c186ce4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c186ce4

Branch: refs/heads/python-sdk
Commit: 7c186ce4ce18c98cf3979b6f0f07c72a47b42ec9
Parents: 031c4cc
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 15:55:26 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c186ce4/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 30ad315..aeed9f9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -201,6 +201,15 @@ class Pipeline(object):
     if not isinstance(transform, ptransform.PTransform):
       raise TypeError("Expected a PTransform object, got %s" % transform)
 
+    if label:
+      # Fix self.label as it is inspected by some PTransform operations
+      # (e.g. to produce error messages for type hint violations).
+      try:
+        old_label, transform.label = transform.label, label
+        return self.apply(transform, pvalueish)
+      finally:
+        transform.label = old_label
+
     full_label = '/'.join([self._current_transform().full_label,
                            label or transform.label]).lstrip('/')
     if full_label in self.applied_labels:


[04/12] incubator-beam git commit: Fix multi-input named PTransforms.

Posted by ro...@apache.org.
Fix multi-input named PTransforms.

Now delegate the __ror__ logic entirely for the naming wrapper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937cf69e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937cf69e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937cf69e

Branch: refs/heads/python-sdk
Commit: 937cf69e958d4a82fb274f311de248930298db69
Parents: 9fe102a
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 14:32:33 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/ptransform.py | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937cf69e/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index da8b671..b652bca 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -400,7 +400,7 @@ class PTransform(WithTypeHints):
     else:
       return NotImplemented
 
-  def __ror__(self, left):
+  def __ror__(self, left, label=None):
     """Used to apply this PTransform to non-PValues, e.g., a tuple."""
     pvalueish, pvalues = self._extract_input_pvalues(left)
     pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
@@ -434,7 +434,7 @@ class PTransform(WithTypeHints):
                     if not isinstance(v, pvalue.PValue) and v is not None}
     pvalueish = _SetInputPValues().visit(pvalueish, replacements)
     self.pipeline = p
-    result = p.apply(self, pvalueish)
+    result = p.apply(self, pvalueish, label)
     if deferred:
       return result
     else:
@@ -720,5 +720,8 @@ class _NamedPTransform(PTransform):
     super(_NamedPTransform, self).__init__(label)
     self.transform = transform
 
+  def __ror__(self, pvalueish):
+    return self.transform.__ror__(pvalueish, self.label)
+
   def apply(self, pvalue):
     raise RuntimeError("Should never be applied directly.")


[02/12] incubator-beam git commit: Move names out of transform constructors.

Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index a747112..0439fe1 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -39,16 +39,16 @@ class CombineTest(unittest.TestCase):
     size = len(vals)
 
     # First for global combines.
-    pcoll = pipeline | Create('start', vals)
-    result_mean = pcoll | combine.Mean.Globally('mean')
-    result_count = pcoll | combine.Count.Globally('count')
+    pcoll = pipeline | 'start' >> Create(vals)
+    result_mean = pcoll | 'mean' >> combine.Mean.Globally()
+    result_count = pcoll | 'count' >> combine.Count.Globally()
     assert_that(result_mean, equal_to([mean]), label='assert:mean')
     assert_that(result_count, equal_to([size]), label='assert:size')
 
     # Again for per-key combines.
-    pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals])
-    result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey')
-    result_key_count = pcoll | combine.Count.PerKey('count-perkey')
+    pcoll = pipeline | 'start-perkey' >> Create([('a', x) for x in vals])
+    result_key_mean = pcoll | 'mean-perkey' >> combine.Mean.PerKey()
+    result_key_count = pcoll | 'count-perkey' >> combine.Count.PerKey()
     assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean')
     assert_that(result_key_count, equal_to([('a', size)]), label='key:size')
     pipeline.run()
@@ -66,9 +66,9 @@ class CombineTest(unittest.TestCase):
              9: 'nniiinne'}
 
     # First for global combines.
-    pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
-    result_top = pcoll | combine.Top.Largest('top', 5)
-    result_bot = pcoll | combine.Top.Smallest('bot', 4)
+    pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+    result_top = pcoll | 'top' >> combine.Top.Largest(5)
+    result_bot = pcoll | 'bot' >> combine.Top.Smallest(4)
     result_cmp = pcoll | combine.Top.Of(
         'cmp',
         6,
@@ -81,8 +81,8 @@ class CombineTest(unittest.TestCase):
     # Again for per-key combines.
     pcoll = pipeline | Create(
         'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
-    result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5)
-    result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4)
+    result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
+    result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
     result_key_cmp = pcoll | combine.Top.PerKey(
         'cmp-perkey',
         6,
@@ -99,15 +99,15 @@ class CombineTest(unittest.TestCase):
   def test_top_shorthands(self):
     pipeline = Pipeline('DirectPipelineRunner')
 
-    pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
-    result_top = pcoll | beam.CombineGlobally('top', combine.Largest(5))
-    result_bot = pcoll | beam.CombineGlobally('bot', combine.Smallest(4))
+    pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+    result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5))
+    result_bot = pcoll | 'bot' >> beam.CombineGlobally(combine.Smallest(4))
     assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
     assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
 
     pcoll = pipeline | Create(
         'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
-    result_ktop = pcoll | beam.CombinePerKey('top-perkey', combine.Largest(5))
+    result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5))
     result_kbot = pcoll | beam.CombinePerKey(
         'bot-perkey', combine.Smallest(4))
     assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
@@ -119,7 +119,7 @@ class CombineTest(unittest.TestCase):
     # First test global samples (lots of them).
     for ix in xrange(300):
       pipeline = Pipeline('DirectPipelineRunner')
-      pcoll = pipeline | Create('start', [1, 1, 2, 2])
+      pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
       result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
 
       def matcher():
@@ -141,7 +141,7 @@ class CombineTest(unittest.TestCase):
     pcoll = pipeline | Create(
         'start-perkey',
         sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
-    result = pcoll | combine.Sample.FixedSizePerKey('sample', 3)
+    result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
 
     def matcher():
       def match(actual):
@@ -158,7 +158,7 @@ class CombineTest(unittest.TestCase):
     p = Pipeline('DirectPipelineRunner')
     result = (
         p
-        | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+        | 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)])
         | beam.CombineGlobally(combine.TupleCombineFn(max,
                                                       combine.MeanCombineFn(),
                                                       sum)).without_defaults())
@@ -179,8 +179,8 @@ class CombineTest(unittest.TestCase):
   def test_to_list_and_to_dict(self):
     pipeline = Pipeline('DirectPipelineRunner')
     the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
-    pcoll = pipeline | Create('start', the_list)
-    result = pcoll | combine.ToList('to list')
+    pcoll = pipeline | 'start' >> Create(the_list)
+    result = pcoll | 'to list' >> combine.ToList()
 
     def matcher(expected):
       def match(actual):
@@ -191,8 +191,8 @@ class CombineTest(unittest.TestCase):
 
     pipeline = Pipeline('DirectPipelineRunner')
     pairs = [(1, 2), (3, 4), (5, 6)]
-    pcoll = pipeline | Create('start-pairs', pairs)
-    result = pcoll | combine.ToDict('to dict')
+    pcoll = pipeline | 'start-pairs' >> Create(pairs)
+    result = pcoll | 'to dict' >> combine.ToDict()
 
     def matcher():
       def match(actual):
@@ -221,8 +221,8 @@ class CombineTest(unittest.TestCase):
         return main | Map(lambda _, s: s, side)
 
     p = Pipeline('DirectPipelineRunner')
-    result1 = p | Create('label1', []) | CombineWithSideInput('L1')
-    result2 = p | Create('label2', [1, 2, 3, 4]) | CombineWithSideInput('L2')
+    result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput()
+    result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> CombineWithSideInput()
     assert_that(result1, equal_to([0]), label='r1')
     assert_that(result2, equal_to([10]), label='r2')
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index b288321..44a6d29 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -814,12 +814,12 @@ class CombineGlobally(PTransform):
         return transform
 
     combined = (pcoll
-                | add_input_types(Map('KeyWithVoid', lambda v: (None, v))
+                | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v))
                                   .with_output_types(
                                       KV[None, pcoll.element_type]))
                 | CombinePerKey(
                     'CombinePerKey', self.fn, *self.args, **self.kwargs)
-                | Map('UnKey', lambda (k, v): v))
+                | 'UnKey' >> Map(lambda (k, v): v))
 
     if not self.has_defaults and not self.as_view:
       return combined
@@ -851,8 +851,8 @@ class CombineGlobally(PTransform):
         else:
           return transform
       return (pcoll.pipeline
-              | Create('DoOnce', [None])
-              | typed(Map('InjectDefault', lambda _, s: s, view)))
+              | 'DoOnce' >> Create([None])
+              | 'InjectDefault' >> typed(Map(lambda _, s: s, view)))
 
 
 class CombinePerKey(PTransformWithSideInputs):
@@ -1045,9 +1045,9 @@ class GroupByKey(PTransform):
       gbk_output_type = KV[key_type, Iterable[value_type]]
 
       return (pcoll
-              | (ParDo('reify_windows', self.ReifyWindows())
+              | 'reify_windows' >> (ParDo(self.ReifyWindows())
                  .with_output_types(reify_output_type))
-              | (GroupByKeyOnly('group_by_key')
+              | 'group_by_key' >> (GroupByKeyOnly()
                  .with_input_types(reify_output_type)
                  .with_output_types(gbk_input_type))
               | (ParDo('group_by_window',
@@ -1056,8 +1056,8 @@ class GroupByKey(PTransform):
                  .with_output_types(gbk_output_type)))
     else:
       return (pcoll
-              | ParDo('reify_windows', self.ReifyWindows())
-              | GroupByKeyOnly('group_by_key')
+              | 'reify_windows' >> ParDo(self.ReifyWindows())
+              | 'group_by_key' >> GroupByKeyOnly()
               | ParDo('group_by_window',
                       self.GroupAlsoByWindow(pcoll.windowing)))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index b652bca..6eb28b0 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -691,7 +691,7 @@ def ptransform_fn(fn):
   With either method the custom PTransform can be used in pipelines as if
   it were one of the "native" PTransforms::
 
-    result_pcoll = input_pcoll | CustomMapper('label', somefn)
+    result_pcoll = input_pcoll | 'label' >> CustomMapper(somefn)
 
   Note that for both solutions the underlying implementation of the pipe
   operator (i.e., `|`) will inject the pcoll argument in its proper place

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index feb081c..8121c1e 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -53,12 +53,12 @@ class PTransformTest(unittest.TestCase):
                      str(PTransform()))
 
     pa = Pipeline('DirectPipelineRunner')
-    res = pa | beam.Create('a_label', [1, 2])
+    res = pa | 'a_label' >> beam.Create([1, 2])
     self.assertEqual('<Create(PTransform) label=[a_label]>',
                      str(res.producer.transform))
 
     pc = Pipeline('DirectPipelineRunner')
-    res = pc | beam.Create('with_inputs', [1, 2])
+    res = pc | 'with_inputs' >> beam.Create([1, 2])
     inputs_tr = res.producer.transform
     inputs_tr.inputs = ('ci',)
     self.assertEqual(
@@ -66,7 +66,7 @@ class PTransformTest(unittest.TestCase):
         str(inputs_tr))
 
     pd = Pipeline('DirectPipelineRunner')
-    res = pd | beam.Create('with_sidei', [1, 2])
+    res = pd | 'with_sidei' >> beam.Create([1, 2])
     side_tr = res.producer.transform
     side_tr.side_inputs = (4,)
     self.assertEqual(
@@ -110,8 +110,8 @@ class PTransformTest(unittest.TestCase):
         return [context.element + addon]
 
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
-    result = pcoll | beam.ParDo('do', AddNDoFn(), 10)
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
@@ -122,21 +122,21 @@ class PTransformTest(unittest.TestCase):
         pass
 
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     with self.assertRaises(ValueError):
-      pcoll | beam.ParDo('do', MyDoFn)  # Note the lack of ()'s
+      pcoll | 'do' >> beam.ParDo(MyDoFn)  # Note the lack of ()'s
 
   def test_do_with_callable(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
-    result = pcoll | beam.FlatMap('do', lambda x, addon: [x + addon], 10)
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
   def test_do_with_side_input_as_arg(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    side = pipeline | beam.Create('side', [10])
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
+    side = pipeline | 'side' >> beam.Create([10])
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
         'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side))
     assert_that(result, equal_to([11, 12, 13]))
@@ -144,8 +144,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_side_input_as_keyword_arg(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    side = pipeline | beam.Create('side', [10])
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
+    side = pipeline | 'side' >> beam.Create([10])
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
         'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
     assert_that(result, equal_to([11, 12, 13]))
@@ -153,8 +153,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_do_fn_returning_string_raises_warning(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ['2', '9', '3'])
-    pcoll | beam.FlatMap('do', lambda x: x + '1')
+    pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
+    pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
 
     # Since the DoFn directly returns a string we should get an error warning
     # us.
@@ -167,8 +167,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_do_fn_returning_dict_raises_warning(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ['2', '9', '3'])
-    pcoll | beam.FlatMap('do', lambda x: {x: '1'})
+    pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
+    pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
 
     # Since the DoFn directly returns a dict we should get an error warning
     # us.
@@ -181,9 +181,9 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_side_outputs_maintains_unique_name(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
-    r1 = pcoll | beam.FlatMap('a', lambda x: [x + 1]).with_outputs(main='m')
-    r2 = pcoll | beam.FlatMap('b', lambda x: [x + 2]).with_outputs(main='m')
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
+    r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
     assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
     assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
     pipeline.run()
@@ -194,8 +194,8 @@ class PTransformTest(unittest.TestCase):
     def incorrect_par_do_fn(x):
       return x + 5
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [2, 9, 3])
-    pcoll | beam.FlatMap('do', incorrect_par_do_fn)
+    pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
+    pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
     # It's a requirement that all user-defined functions to a ParDo return
     # an iterable.
     with self.assertRaises(typehints.TypeCheckError) as cm:
@@ -215,8 +215,8 @@ class PTransformTest(unittest.TestCase):
       def finish_bundle(self, c):
         yield 'finish'
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
-    result = pcoll | beam.ParDo('do', MyDoFn())
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'do' >> beam.ParDo(MyDoFn())
 
     # May have many bundles, but each has a start and finish.
     def  matcher():
@@ -230,7 +230,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_filter(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3, 4])
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
     result = pcoll | beam.Filter(
         'filter', lambda x: x % 2 == 0)
     assert_that(result, equal_to([2, 4]))
@@ -256,15 +256,15 @@ class PTransformTest(unittest.TestCase):
   def test_combine_with_combine_fn(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', vals)
-    result = pcoll | beam.CombineGlobally('mean', self._MeanCombineFn())
+    pcoll = pipeline | 'start' >> beam.Create(vals)
+    result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
     assert_that(result, equal_to([sum(vals) / len(vals)]))
     pipeline.run()
 
   def test_combine_with_callable(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', vals)
+    pcoll = pipeline | 'start' >> beam.Create(vals)
     result = pcoll | beam.CombineGlobally(sum)
     assert_that(result, equal_to([sum(vals)]))
     pipeline.run()
@@ -272,8 +272,8 @@ class PTransformTest(unittest.TestCase):
   def test_combine_with_side_input_as_arg(self):
     values = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', values)
-    divisor = pipeline | beam.Create('divisor', [2])
+    pcoll = pipeline | 'start' >> beam.Create(values)
+    divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombineGlobally(
         'max',
         # Multiples of divisor only.
@@ -287,9 +287,9 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                              [('b', x) for x in vals_2]))
-    result = pcoll | beam.CombinePerKey('mean', self._MeanCombineFn())
+    result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
     assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
                                   ('b', sum(vals_2) / len(vals_2))]))
     pipeline.run()
@@ -298,7 +298,7 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                              [('b', x) for x in vals_2]))
     result = pcoll | beam.CombinePerKey(sum)
     assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
@@ -308,9 +308,9 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                              [('b', x) for x in vals_2]))
-    divisor = pipeline | beam.Create('divisor', [2])
+    divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombinePerKey(
         lambda vals, d: max(v for v in vals if v % d == 0),
         pvalue.AsSingleton(divisor))  # Multiples of divisor only.
@@ -323,7 +323,7 @@ class PTransformTest(unittest.TestCase):
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | beam.Create(
         'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
-    result = pcoll | beam.GroupByKey('group')
+    result = pcoll | 'group' >> beam.GroupByKey()
     assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
     pipeline.run()
 
@@ -335,9 +335,9 @@ class PTransformTest(unittest.TestCase):
         return (context.element % 3) + offset
 
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     # Attempt nominal partition operation.
-    partitions = pcoll | beam.Partition('part1', SomePartitionFn(), 4, 1)
+    partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
     assert_that(partitions[0], equal_to([]))
     assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
     assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
@@ -347,14 +347,14 @@ class PTransformTest(unittest.TestCase):
     # Check that a bad partition label will yield an error. For the
     # DirectPipelineRunner, this error manifests as an exception.
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
-    partitions = pcoll | beam.Partition('part2', SomePartitionFn(), 4, 10000)
+    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+    partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
     with self.assertRaises(ValueError):
       pipeline.run()
 
   def test_partition_with_callable(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = (
         pcoll | beam.Partition(
             'part',
@@ -370,49 +370,49 @@ class PTransformTest(unittest.TestCase):
     """Regression test for an issue with how partitions are handled."""
     pipeline = Pipeline('DirectPipelineRunner')
     contents = [('aa', 1), ('bb', 2), ('aa', 2)]
-    created = pipeline | beam.Create('A', contents)
-    partitioned = created | beam.Partition('B', lambda x, n: len(x) % n, 3)
-    flattened = partitioned | beam.Flatten('C')
-    grouped = flattened | beam.GroupByKey('D')
+    created = pipeline | 'A' >> beam.Create(contents)
+    partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
+    flattened = partitioned | 'C' >> beam.Flatten()
+    grouped = flattened | 'D' >> beam.GroupByKey()
     assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
     pipeline.run()
 
   def test_flatten_pcollections(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | beam.Create('start_1', [0, 1, 2, 3])
-    pcoll_2 = pipeline | beam.Create('start_2', [4, 5, 6, 7])
-    result = (pcoll_1, pcoll_2) | beam.Flatten('flatten')
+    pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
+    pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
+    result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
     assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
     pipeline.run()
 
   def test_flatten_no_pcollections(self):
     pipeline = Pipeline('DirectPipelineRunner')
     with self.assertRaises(ValueError):
-      () | beam.Flatten('pipeline arg missing')
-    result = () | beam.Flatten('empty', pipeline=pipeline)
+      () | 'pipeline arg missing' >> beam.Flatten()
+    result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
     assert_that(result, equal_to([]))
     pipeline.run()
 
   def test_flatten_pcollections_in_iterable(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | beam.Create('start_1', [0, 1, 2, 3])
-    pcoll_2 = pipeline | beam.Create('start_2', [4, 5, 6, 7])
+    pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
+    pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
     result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
-              | beam.Flatten('flatten'))
+              | 'flatten' >> beam.Flatten())
     assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
     pipeline.run()
 
   def test_flatten_input_type_must_be_iterable(self):
     # Inputs to flatten *must* be an iterable.
     with self.assertRaises(ValueError):
-      4 | beam.Flatten('flatten')
+      4 | 'flatten' >> beam.Flatten()
 
   def test_flatten_input_type_must_be_iterable_of_pcolls(self):
     # Inputs to flatten *must* be an iterable of PCollections.
     with self.assertRaises(TypeError):
-      {'l': 'test'} | beam.Flatten('flatten')
+      {'l': 'test'} | 'flatten' >> beam.Flatten()
     with self.assertRaises(TypeError):
-      set([1, 2, 3]) | beam.Flatten('flatten')
+      set([1, 2, 3]) | 'flatten' >> beam.Flatten()
 
   def test_co_group_by_key_on_list(self):
     pipeline = Pipeline('DirectPipelineRunner')
@@ -420,7 +420,7 @@ class PTransformTest(unittest.TestCase):
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
         'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = (pcoll_1, pcoll_2) | beam.CoGroupByKey('cgbk')
+    result = (pcoll_1, pcoll_2) | 'cgbk' >> beam.CoGroupByKey()
     assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
                                   ('b', ([3], [])),
                                   ('c', ([4], [7, 8]))]))
@@ -433,7 +433,7 @@ class PTransformTest(unittest.TestCase):
     pcoll_2 = pipeline | beam.Create(
         'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
     result = ([pc for pc in (pcoll_1, pcoll_2)]
-              | beam.CoGroupByKey('cgbk'))
+              | 'cgbk' >> beam.CoGroupByKey())
     assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
                                   ('b', ([3], [])),
                                   ('c', ([4], [7, 8]))]))
@@ -445,7 +445,7 @@ class PTransformTest(unittest.TestCase):
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
         'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey('cgbk')
+    result = {'X': pcoll_1, 'Y': pcoll_2} | 'cgbk' >> beam.CoGroupByKey()
     assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
                                   ('b', {'X': [3], 'Y': []}),
                                   ('c', {'X': [4], 'Y': [7, 8]})]))
@@ -453,10 +453,10 @@ class PTransformTest(unittest.TestCase):
 
   def test_group_by_key_input_must_be_kv_pairs(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcolls = pipeline | beam.Create('A', [1, 2, 3, 4, 5])
+    pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5])
 
     with self.assertRaises(typehints.TypeCheckError) as e:
-      pcolls | beam.GroupByKey('D')
+      pcolls | 'D' >> beam.GroupByKey()
       pipeline.run()
 
     self.assertStartswith(
@@ -466,9 +466,9 @@ class PTransformTest(unittest.TestCase):
 
   def test_group_by_key_only_input_must_be_kv_pairs(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcolls = pipeline | beam.Create('A', ['a', 'b', 'f'])
+    pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
     with self.assertRaises(typehints.TypeCheckError) as cm:
-      pcolls | beam.GroupByKeyOnly('D')
+      pcolls | 'D' >> beam.GroupByKeyOnly()
       pipeline.run()
 
     expected_error_prefix = ('Input type hint violation at D: expected '
@@ -506,20 +506,20 @@ class PTransformTest(unittest.TestCase):
     t = (beam.Map(lambda x: (x, 1))
          | beam.GroupByKey()
          | beam.Map(lambda (x, ones): (x, sum(ones))))
-    result = pipeline | beam.Create('start', ['a', 'a', 'b']) | t
+    result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
     assert_that(result, equal_to([('a', 2), ('b', 1)]))
     pipeline.run()
 
   def test_apply_to_list(self):
     self.assertItemsEqual(
-        [1, 2, 3], [0, 1, 2] | beam.Map('add_one', lambda x: x + 1))
-    self.assertItemsEqual([1], [0, 1, 2] | beam.Filter('odd', lambda x: x % 2))
+        [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
+    self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
     self.assertItemsEqual([1, 2, 100, 3],
-                          ([1, 2, 3], [100]) | beam.Flatten('flat'))
+                          ([1, 2, 3], [100]) | 'flat' >> beam.Flatten())
     join_input = ([('k', 'a')],
                   [('k', 'b'), ('k', 'c')])
     self.assertItemsEqual([('k', (['a'], ['b', 'c']))],
-                          join_input | beam.CoGroupByKey('join'))
+                          join_input | 'join' >> beam.CoGroupByKey())
 
   def test_multi_input_ptransform(self):
     class DisjointUnion(PTransform):
@@ -583,7 +583,7 @@ class PTransformLabelsTest(unittest.TestCase):
     gbk = beam.GroupByKey('gbk')
     map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones)))
     t = (map1 | gbk | map2)
-    result = pipeline | beam.Create('start', ['a', 'a', 'b']) | t
+    result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
     self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
     self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels)
     self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels)
@@ -592,7 +592,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_apply_custom_transform_without_label(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform()
     result = pipeline.apply(custom, pcoll)
     self.assertTrue('CustomTransform' in pipeline.applied_labels)
@@ -602,7 +602,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_apply_custom_transform_with_label(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform('*custom*')
     result = pipeline.apply(custom, pcoll)
     self.assertTrue('*custom*' in pipeline.applied_labels)
@@ -613,7 +613,7 @@ class PTransformLabelsTest(unittest.TestCase):
   def test_combine_without_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', vals)
+    pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally(sum)
     result = pcoll | combine
     self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels)
@@ -622,7 +622,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_apply_ptransform_using_decorator(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     sample = SamplePTransform('*sample*')
     _ = pcoll | sample
     self.assertTrue('*sample*' in pipeline.applied_labels)
@@ -633,7 +633,7 @@ class PTransformLabelsTest(unittest.TestCase):
   def test_combine_with_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', vals)
+    pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally('*sum*', sum)
     result = pcoll | combine
     self.assertTrue('*sum*' in pipeline.applied_labels)
@@ -642,7 +642,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def check_label(self, ptransform, expected_label):
     pipeline = Pipeline('DirectPipelineRunner')
-    pipeline | beam.Create('start', [('a', 1)]) | ptransform
+    pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
     actual_label = sorted(pipeline.applied_labels - {'start'})[0]
     self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
 
@@ -679,8 +679,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         return [context.element + five]
 
     d = (self.p
-         | beam.Create('t', [1, 2, 3]).with_output_types(int)
-         | beam.ParDo('add', AddWithFive(), 5))
+         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'add' >> beam.ParDo(AddWithFive(), 5))
 
     assert_that(d, equal_to([6, 7, 8]))
     self.p.run()
@@ -694,8 +694,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('t', [1, 2, 3]).with_output_types(int)
-       | beam.ParDo('upper', ToUpperCaseWithPrefix(), 'hello'))
+       | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
 
     self.assertEqual("Type hint violation for 'upper': "
                      "requires <type 'str'> but got <type 'int'> for context",
@@ -711,8 +711,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         return [context.element + num]
 
     d = (self.p
-         | beam.Create('t', [1, 2, 3]).with_output_types(int)
-         | beam.ParDo('add', AddWithNum(), 5))
+         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'add' >> beam.ParDo(AddWithNum(), 5))
 
     assert_that(d, equal_to([6, 7, 8]))
     self.p.run()
@@ -728,8 +728,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('t', ['1', '2', '3']).with_output_types(str)
-       | beam.ParDo('add', AddWithNum(), 5))
+       | 't' >> beam.Create(['1', '2', '3']).with_output_types(str)
+       | 'add' >> beam.ParDo(AddWithNum(), 5))
       self.p.run()
 
     self.assertEqual("Type hint violation for 'add': "
@@ -746,8 +746,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # will receive a str instead, which should result in a raised exception.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', ['b', 'a', 'r']).with_output_types(str)
-       | beam.FlatMap('to str', int_to_str))
+       | 's' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
+       | 'to str' >> beam.FlatMap(int_to_str))
 
     self.assertEqual("Type hint violation for 'to str': "
                      "requires <type 'int'> but got <type 'str'> for a",
@@ -761,8 +761,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # If this type-checks than no error should be raised.
     d = (self.p
-         | beam.Create('t', ['t', 'e', 's', 't']).with_output_types(str)
-         | beam.FlatMap('case', to_all_upper_case))
+         | 't' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'case' >> beam.FlatMap(to_all_upper_case))
     assert_that(d, equal_to(['T', 'E', 'S', 'T']))
     self.p.run()
 
@@ -775,10 +775,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # expecting pcoll's of type str instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-       | (beam.FlatMap('score', lambda x: [1] if x == 't' else [2])
+       | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+       | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2])
           .with_input_types(str).with_output_types(int))
-       | (beam.FlatMap('upper', lambda x: [x.upper()])
+       | 'upper' >> (beam.FlatMap(lambda x: [x.upper()])
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Type hint violation for 'upper': "
@@ -788,10 +788,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_pardo_properly_type_checks_using_type_hint_methods(self):
     # Pipeline should be created successfully without an error
     d = (self.p
-         | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-         | beam.FlatMap('dup', lambda x: [x + x])
+         | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'dup' >> beam.FlatMap(lambda x: [x + x])
          .with_input_types(str).with_output_types(str)
-         | beam.FlatMap('upper', lambda x: [x.upper()])
+         | 'upper' >> beam.FlatMap(lambda x: [x.upper()])
          .with_input_types(str).with_output_types(str))
 
     assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT']))
@@ -802,8 +802,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # int's, while Map is expecting one of str.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
-       | beam.Map('upper', lambda x: x.upper())
+       | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'upper' >> beam.Map(lambda x: x.upper())
        .with_input_types(str).with_output_types(str))
 
     self.assertEqual("Type hint violation for 'upper': "
@@ -813,8 +813,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_map_properly_type_checks_using_type_hints_methods(self):
     # No error should be raised if this type-checks properly.
     d = (self.p
-         | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
-         | beam.Map('to_str', lambda x: str(x))
+         | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+         | 'to_str' >> beam.Map(lambda x: str(x))
          .with_input_types(int).with_output_types(str))
     assert_that(d, equal_to(['1', '2', '3', '4']))
     self.p.run()
@@ -829,8 +829,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # However, 'Map' should detect that Create has hinted an int instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
-       | beam.Map('upper', upper))
+       | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'upper' >> beam.Map(upper))
 
     self.assertEqual("Type hint violation for 'upper': "
                      "requires <type 'str'> but got <type 'int'> for s",
@@ -844,8 +844,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # If this type-checks than no error should be raised.
     d = (self.p
-         | beam.Create('bools', [True, False, True]).with_output_types(bool)
-         | beam.Map('to_ints', bool_to_int))
+         | 'bools' >> beam.Create([True, False, True]).with_output_types(bool)
+         | 'to_ints' >> beam.Map(bool_to_int))
     assert_that(d, equal_to([1, 0, 1]))
     self.p.run()
 
@@ -854,10 +854,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # incoming.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
-       | beam.Map('lower', lambda x: x.lower())
+       | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+       | 'lower' >> beam.Map(lambda x: x.lower())
        .with_input_types(str).with_output_types(str)
-       | beam.Filter('below 3', lambda x: x < 3).with_input_types(int))
+       | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
 
     self.assertEqual("Type hint violation for 'below 3': "
                      "requires <type 'int'> but got <type 'str'> for x",
@@ -866,10 +866,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_filter_type_checks_using_type_hints_method(self):
     # No error should be raised if this type-checks properly.
     d = (self.p
-         | beam.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
-         | beam.Map('to int', lambda x: int(x))
+         | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+         | 'to int' >> beam.Map(lambda x: int(x))
          .with_input_types(str).with_output_types(int)
-         | beam.Filter('below 3', lambda x: x < 3).with_input_types(int))
+         | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
     assert_that(d, equal_to([1, 2]))
     self.p.run()
 
@@ -881,8 +881,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Func above was hinted to only take a float, yet an int will be passed.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('ints', [1, 2, 3, 4]).with_output_types(int)
-       | beam.Filter('half', more_than_half))
+       | 'ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'half' >> beam.Filter(more_than_half))
 
     self.assertEqual("Type hint violation for 'half': "
                      "requires <type 'float'> but got <type 'int'> for a",
@@ -896,17 +896,17 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # Filter should deduce that it returns the same type that it takes.
     (self.p
-     | beam.Create('str', range(5)).with_output_types(int)
-     | beam.Filter('half', half)
-     | beam.Map('to bool', lambda x: bool(x))
+     | 'str' >> beam.Create(range(5)).with_output_types(int)
+     | 'half' >> beam.Filter(half)
+     | 'to bool' >> beam.Map(lambda x: bool(x))
      .with_input_types(int).with_output_types(bool))
 
   def test_group_by_key_only_output_type_deduction(self):
     d = (self.p
-         | beam.Create('str', ['t', 'e', 's', 't']).with_output_types(str)
-         | (beam.Map('pair', lambda x: (x, ord(x)))
+         | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'pair' >> (beam.Map(lambda x: (x, ord(x)))
             .with_output_types(typehints.KV[str, str]))
-         | beam.GroupByKeyOnly('O'))
+         | 'O' >> beam.GroupByKeyOnly())
 
     # Output type should correctly be deduced.
     # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -915,10 +915,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_group_by_key_output_type_deduction(self):
     d = (self.p
-         | beam.Create('str', range(20)).with_output_types(int)
-         | (beam.Map('pair negative', lambda x: (x % 5, -x))
+         | 'str' >> beam.Create(range(20)).with_output_types(int)
+         | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x))
             .with_output_types(typehints.KV[int, int]))
-         | beam.GroupByKey('T'))
+         | 'T' >> beam.GroupByKey())
 
     # Output type should correctly be deduced.
     # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -929,8 +929,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # GBK will be passed raw int's here instead of some form of KV[A, B].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', [1, 2, 3]).with_output_types(int)
-       | beam.GroupByKeyOnly('F'))
+       | 's' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'F' >> beam.GroupByKeyOnly())
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -942,9 +942,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # aliased to Tuple[int, str].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | (beam.Create('s', range(5))
+       | 's' >> (beam.Create(range(5))
           .with_output_types(typehints.Iterable[int]))
-       | beam.GroupByKey('T'))
+       | 'T' >> beam.GroupByKey())
 
     self.assertEqual("Input type hint violation at T: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -958,8 +958,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # information to the ParDo.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('nums', range(5))
-       | beam.FlatMap('mod dup', lambda x: (x % 2, x)))
+       | 'nums' >> beam.Create(range(5))
+       | 'mod dup' >> beam.FlatMap(lambda x: (x % 2, x)))
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform Create(nums)',
@@ -971,9 +971,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # information to GBK-only.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('nums', range(5)).with_output_types(int)
-       | beam.Map('mod dup', lambda x: (x % 2, x))
-       | beam.GroupByKeyOnly('G'))
+       | 'nums' >> beam.Create(range(5)).with_output_types(int)
+       | 'mod dup' >> beam.Map(lambda x: (x % 2, x))
+       | 'G' >> beam.GroupByKeyOnly())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -986,8 +986,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # The pipeline below should raise a TypeError, however pipeline type
     # checking was disabled above.
     (self.p
-     | beam.Create('t', [1, 2, 3]).with_output_types(int)
-     | beam.Map('lower', lambda x: x.lower())
+     | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+     | 'lower' >> beam.Map(lambda x: x.lower())
      .with_input_types(str).with_output_types(str))
 
   def test_run_time_type_checking_enabled_type_violation(self):
@@ -1002,8 +1002,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Function above has been type-hinted to only accept an int. But in the
     # pipeline execution it'll be passed a string due to the output of Create.
     (self.p
-     | beam.Create('t', ['some_string'])
-     | beam.Map('to str', int_to_string))
+     | 't' >> beam.Create(['some_string'])
+     | 'to str' >> beam.Map(int_to_string))
     with self.assertRaises(typehints.TypeCheckError) as e:
       self.p.run()
 
@@ -1026,10 +1026,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Pipeline checking is off, but the above function should satisfy types at
     # run-time.
     result = (self.p
-              | beam.Create('t', ['t', 'e', 's', 't', 'i', 'n', 'g'])
+              | 't' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
               .with_output_types(str)
-              | beam.Map('gen keys', group_with_upper_ord)
-              | beam.GroupByKey('O'))
+              | 'gen keys' >> beam.Map(group_with_upper_ord)
+              | 'O' >> beam.GroupByKey())
 
     assert_that(result, equal_to([(1, ['g']),
                                   (3, ['s', 'i', 'n']),
@@ -1048,9 +1048,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return (a % 2, a)
 
     (self.p
-     | beam.Create('nums', range(5)).with_output_types(int)
-     | beam.Map('is even', is_even_as_key)
-     | beam.GroupByKey('parity'))
+     | 'nums' >> beam.Create(range(5)).with_output_types(int)
+     | 'is even' >> beam.Map(is_even_as_key)
+     | 'parity' >> beam.GroupByKey())
 
     # Although all the types appear to be correct when checked at pipeline
     # construction. Runtime type-checking should detect the 'is_even_as_key' is
@@ -1077,9 +1077,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return (a % 2 == 0, a)
 
     result = (self.p
-              | beam.Create('nums', range(5)).with_output_types(int)
-              | beam.Map('is even', is_even_as_key)
-              | beam.GroupByKey('parity'))
+              | 'nums' >> beam.Create(range(5)).with_output_types(int)
+              | 'is even' >> beam.Map(is_even_as_key)
+              | 'parity' >> beam.GroupByKey())
 
     assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
     self.p.run()
@@ -1092,8 +1092,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # ParDo should receive an instance of type 'str', however an 'int' will be
     # passed instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | beam.Create('n', [1, 2, 3])
-       | (beam.FlatMap('to int', lambda x: [int(x)])
+      (self.p | 'n' >> beam.Create([1, 2, 3])
+       | 'to int' >> (beam.FlatMap(lambda x: [int(x)])
           .with_input_types(str).with_output_types(int))
       )
       self.p.run()
@@ -1111,8 +1111,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
-       | (beam.FlatMap('add', lambda (x, y): [x + y])
+       | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+       | 'add' >> (beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
       )
       self.p.run()
@@ -1136,8 +1136,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         lambda x: [float(x)]).with_input_types(int).with_output_types(
             int).get_type_hints()
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | beam.Create('n', [1, 2, 3])
-       | (beam.FlatMap('to int', lambda x: [float(x)])
+      (self.p | 'n' >> beam.Create([1, 2, 3])
+       | 'to int' >> (beam.FlatMap(lambda x: [float(x)])
           .with_input_types(int).with_output_types(int))
       )
       self.p.run()
@@ -1159,8 +1159,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # of 'int' will be generated instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
-       | (beam.FlatMap('swap', lambda (x, y): [x + y])
+       | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+       | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, float])
           .with_output_types(typehints.Tuple[float, int]))
       )
@@ -1183,7 +1183,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return a + b
 
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | beam.Create('t', [1, 2, 3, 4]) | beam.Map('add 1', add, 1.0))
+      (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
       self.p.run()
 
     self.assertStartswith(
@@ -1199,8 +1199,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('t', [1, 2, 3, 4])
-       | (beam.Map('add 1', lambda x, one: x + one, 1.0)
+       | 't' >> beam.Create([1, 2, 3, 4])
+       | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0)
           .with_input_types(int, int)
           .with_output_types(float)))
       self.p.run()
@@ -1219,8 +1219,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return sum(ints)
 
     d = (self.p
-         | beam.Create('t', [1, 2, 3]).with_output_types(int)
-         | beam.CombineGlobally('sum', sum_ints))
+         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'sum' >> beam.CombineGlobally(sum_ints))
 
     self.assertEqual(int, d.element_type)
     assert_that(d, equal_to([6]))
@@ -1234,8 +1234,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('m', [1, 2, 3]).with_output_types(int)
-       | beam.CombineGlobally('add', bad_combine))
+       | 'm' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'add' >> beam.CombineGlobally(bad_combine))
 
     self.assertEqual(
         "All functions for a Combine PTransform must accept a "
@@ -1255,9 +1255,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return list(range(n+1))
 
     d = (self.p
-         | beam.Create('t', [1, 2, 3]).with_output_types(int)
-         | beam.CombineGlobally('sum', sum_ints)
-         | beam.ParDo('range', range_from_zero))
+         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'sum' >> beam.CombineGlobally(sum_ints)
+         | 'range' >> beam.ParDo(range_from_zero))
 
     self.assertEqual(int, d.element_type)
     assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6]))
@@ -1272,8 +1272,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return reduce(operator.mul, ints, 1)
 
     d = (self.p
-         | beam.Create('k', [5, 5, 5, 5]).with_output_types(int)
-         | beam.CombineGlobally('mul', iter_mul))
+         | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+         | 'mul' >> beam.CombineGlobally(iter_mul))
 
     assert_that(d, equal_to([625]))
     self.p.run()
@@ -1290,8 +1290,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('k', [5, 5, 5, 5]).with_output_types(int)
-       | beam.CombineGlobally('mul', iter_mul))
+       | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+       | 'mul' >> beam.CombineGlobally(iter_mul))
       self.p.run()
 
     self.assertStartswith(
@@ -1305,8 +1305,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_combine_pipeline_type_check_using_methods(self):
     d = (self.p
-         | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-         | (beam.CombineGlobally('concat', lambda s: ''.join(s))
+         | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s))
             .with_input_types(str).with_output_types(str)))
 
     def matcher(expected):
@@ -1321,8 +1321,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('s', range(5)).with_output_types(int)
-         | (beam.CombineGlobally('sum', lambda s: sum(s))
+         | 's' >> beam.Create(range(5)).with_output_types(int)
+         | 'sum' >> (beam.CombineGlobally(lambda s: sum(s))
             .with_input_types(int).with_output_types(int)))
 
     assert_that(d, equal_to([10]))
@@ -1331,8 +1331,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_combine_pipeline_type_check_violation_using_methods(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('e', range(3)).with_output_types(int)
-       | (beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+       | 'e' >> beam.Create(range(3)).with_output_types(int)
+       | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Input type hint violation at sort join: "
@@ -1345,8 +1345,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('e', range(3)).with_output_types(int)
-       | (beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+       | 'e' >> beam.Create(range(3)).with_output_types(int)
+       | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
       self.p.run()
 
@@ -1363,9 +1363,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('e', range(3)).with_output_types(int)
-       | beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
-       | beam.Map('f', lambda x: x + 1))
+       | 'e' >> beam.Create(range(3)).with_output_types(int)
+       | 'sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | 'f' >> beam.Map(lambda x: x + 1))
 
     self.assertEqual(
         'Pipeline type checking is enabled, '
@@ -1375,8 +1375,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_mean_globally_pipeline_checking_satisfied(self):
     d = (self.p
-         | beam.Create('c', range(5)).with_output_types(int)
-         | combine.Mean.Globally('mean'))
+         | 'c' >> beam.Create(range(5)).with_output_types(int)
+         | 'mean' >> combine.Mean.Globally())
 
     self.assertTrue(d.element_type is float)
     assert_that(d, equal_to([2.0]))
@@ -1385,8 +1385,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_mean_globally_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('c', ['test']).with_output_types(str)
-       | combine.Mean.Globally('mean'))
+       | 'c' >> beam.Create(['test']).with_output_types(str)
+       | 'mean' >> combine.Mean.Globally())
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
                      "requires Tuple[TypeVariable[K], "
@@ -1398,8 +1398,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('c', range(5)).with_output_types(int)
-         | combine.Mean.Globally('mean'))
+         | 'c' >> beam.Create(range(5)).with_output_types(int)
+         | 'mean' >> combine.Mean.Globally())
 
     self.assertTrue(d.element_type is float)
     assert_that(d, equal_to([2.0]))
@@ -1411,8 +1411,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
-       | combine.Mean.Globally('mean'))
+       | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+       | 'mean' >> combine.Mean.Globally())
       self.p.run()
       self.assertEqual("Runtime type violation detected for transform input "
                        "when executing ParDoFlatMap(Combine): Tuple[Any, "
@@ -1427,10 +1427,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_mean_per_key_pipeline_checking_satisfied(self):
     d = (self.p
-         | beam.Create('c', range(5)).with_output_types(int)
-         | (beam.Map('even group', lambda x: (not x % 2, x))
+         | 'c' >> beam.Create(range(5)).with_output_types(int)
+         | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
-         | combine.Mean.PerKey('even mean'))
+         | 'even mean' >> combine.Mean.PerKey())
 
     self.assertCompatible(typehints.KV[bool, float], d.element_type)
     assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1439,10 +1439,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_mean_per_key_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('e', map(str, range(5))).with_output_types(str)
-       | (beam.Map('upper pair', lambda x: (x.upper(), x))
+       | 'e' >> beam.Create(map(str, range(5))).with_output_types(str)
+       | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x))
           .with_output_types(typehints.KV[str, str]))
-       | combine.Mean.PerKey('even mean'))
+       | 'even mean' >> combine.Mean.PerKey())
       self.p.run()
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
@@ -1455,10 +1455,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('c', range(5)).with_output_types(int)
-         | (beam.Map('odd group', lambda x: (bool(x % 2), x))
+         | 'c' >> beam.Create(range(5)).with_output_types(int)
+         | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x))
             .with_output_types(typehints.KV[bool, int]))
-         | combine.Mean.PerKey('odd mean'))
+         | 'odd mean' >> combine.Mean.PerKey())
 
     self.assertCompatible(typehints.KV[bool, float], d.element_type)
     assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1470,10 +1470,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('c', range(5)).with_output_types(int)
-       | (beam.Map('odd group', lambda x: (x, str(bool(x % 2))))
+       | 'c' >> beam.Create(range(5)).with_output_types(int)
+       | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2))))
           .with_output_types(typehints.KV[int, str]))
-       | combine.Mean.PerKey('odd mean'))
+       | 'odd mean' >> combine.Mean.PerKey())
       self.p.run()
 
     self.assertStartswith(
@@ -1494,8 +1494,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_globally_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | beam.Create('p', range(5)).with_output_types(int)
-         | combine.Count.Globally('count int'))
+         | 'p' >> beam.Create(range(5)).with_output_types(int)
+         | 'count int' >> combine.Count.Globally())
 
     self.assertTrue(d.element_type is int)
     assert_that(d, equal_to([5]))
@@ -1505,8 +1505,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('p', range(5)).with_output_types(int)
-         | combine.Count.Globally('count int'))
+         | 'p' >> beam.Create(range(5)).with_output_types(int)
+         | 'count int' >> combine.Count.Globally())
 
     self.assertTrue(d.element_type is int)
     assert_that(d, equal_to([5]))
@@ -1514,10 +1514,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_perkey_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | beam.Create('p', range(5)).with_output_types(int)
-         | (beam.Map('even group', lambda x: (not x % 2, x))
+         | 'p' >> beam.Create(range(5)).with_output_types(int)
+         | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
-         | combine.Count.PerKey('count int'))
+         | 'count int' >> combine.Count.PerKey())
 
     self.assertCompatible(typehints.KV[bool, int], d.element_type)
     assert_that(d, equal_to([(False, 2), (True, 3)]))
@@ -1526,8 +1526,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_count_perkey_pipeline_type_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('p', range(5)).with_output_types(int)
-       | combine.Count.PerKey('count int'))
+       | 'p' >> beam.Create(range(5)).with_output_types(int)
+       | 'count int' >> combine.Count.PerKey())
 
     self.assertEqual("Input type hint violation at GroupByKey: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1538,10 +1538,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
-         | beam.Map('dup key', lambda x: (x, x))
+         | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'dup key' >> beam.Map(lambda x: (x, x))
          .with_output_types(typehints.KV[str, str])
-         | combine.Count.PerKey('count dups'))
+         | 'count dups' >> combine.Count.PerKey())
 
     self.assertCompatible(typehints.KV[str, int], d.element_type)
     assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)]))
@@ -1549,8 +1549,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_perelement_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | beam.Create('w', [1, 1, 2, 3]).with_output_types(int)
-         | combine.Count.PerElement('count elems'))
+         | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int)
+         | 'count elems' >> combine.Count.PerElement())
 
     self.assertCompatible(typehints.KV[int, int], d.element_type)
     assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)]))
@@ -1561,8 +1561,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('f', [1, 1, 2, 3])
-       | combine.Count.PerElement('count elems'))
+       | 'f' >> beam.Create([1, 1, 2, 3])
+       | 'count elems' >> combine.Count.PerElement())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -1573,9 +1573,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('w', [True, True, False, True, True])
+         | 'w' >> beam.Create([True, True, False, True, True])
          .with_output_types(bool)
-         | combine.Count.PerElement('count elems'))
+         | 'count elems' >> combine.Count.PerElement())
 
     self.assertCompatible(typehints.KV[bool, int], d.element_type)
     assert_that(d, equal_to([(False, 1), (True, 4)]))
@@ -1583,8 +1583,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_top_of_pipeline_checking_satisfied(self):
     d = (self.p
-         | beam.Create('n', range(5, 11)).with_output_types(int)
-         | combine.Top.Of('top 3', 3, lambda x, y: x < y))
+         | 'n' >> beam.Create(range(5, 11)).with_output_types(int)
+         | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[int],
                           d.element_type)
@@ -1595,8 +1595,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('n', list('testing')).with_output_types(str)
-         | combine.Top.Of('acii top', 3, lambda x, y: x < y))
+         | 'n' >> beam.Create(list('testing')).with_output_types(str)
+         | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[str], d.element_type)
     assert_that(d, equal_to([['t', 't', 's']]))
@@ -1605,9 +1605,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_per_key_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('n', range(100)).with_output_types(int)
-       | beam.Map('num + 1', lambda x: x + 1).with_output_types(int)
-       | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+       | 'n' >> beam.Create(range(100)).with_output_types(int)
+       | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
+       | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertEqual("Input type hint violation at GroupByKey: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1616,10 +1616,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_per_key_pipeline_checking_satisfied(self):
     d = (self.p
-         | beam.Create('n', range(100)).with_output_types(int)
-         | (beam.Map('group mod 3', lambda x: (x % 3, x))
+         | 'n' >> beam.Create(range(100)).with_output_types(int)
+         | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
-         | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+         | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1630,10 +1630,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('n', range(21))
-         | (beam.Map('group mod 3', lambda x: (x % 3, x))
+         | 'n' >> beam.Create(range(21))
+         | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
-         | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+         | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1642,8 +1642,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_sample_globally_pipeline_satisfied(self):
     d = (self.p
-         | beam.Create('m', [2, 2, 3, 3]).with_output_types(int)
-         | combine.Sample.FixedSizeGlobally('sample', 3))
+         | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+         | 'sample' >> combine.Sample.FixedSizeGlobally(3))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
 
@@ -1658,8 +1658,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('m', [2, 2, 3, 3]).with_output_types(int)
-         | combine.Sample.FixedSizeGlobally('sample', 2))
+         | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+         | 'sample' >> combine.Sample.FixedSizeGlobally(2))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
 
@@ -1672,9 +1672,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_sample_per_key_pipeline_satisfied(self):
     d = (self.p
-         | (beam.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+         | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
-         | combine.Sample.FixedSizePerKey('sample', 2))
+         | 'sample' >> combine.Sample.FixedSizePerKey(2))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1691,9 +1691,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | (beam.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+         | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
-         | combine.Sample.FixedSizePerKey('sample', 1))
+         | 'sample' >> combine.Sample.FixedSizePerKey(1))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1708,8 +1708,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_to_list_pipeline_check_satisfied(self):
     d = (self.p
-         | beam.Create('c', (1, 2, 3, 4)).with_output_types(int)
-         | combine.ToList('to list'))
+         | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int)
+         | 'to list' >> combine.ToList())
 
     self.assertCompatible(typehints.List[int], d.element_type)
 
@@ -1724,8 +1724,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('c', list('test')).with_output_types(str)
-         | combine.ToList('to list'))
+         | 'c' >> beam.Create(list('test')).with_output_types(str)
+         | 'to list' >> combine.ToList())
 
     self.assertCompatible(typehints.List[str], d.element_type)
 
@@ -1739,8 +1739,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_to_dict_pipeline_check_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('d', [1, 2, 3, 4]).with_output_types(int)
-       | combine.ToDict('to dict'))
+       | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'to dict' >> combine.ToDict())
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
                      "requires Tuple[TypeVariable[K], "
@@ -1753,7 +1753,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | beam.Create(
              'd',
              [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
-         | combine.ToDict('to dict'))
+         | 'to dict' >> combine.ToDict())
 
     self.assertCompatible(typehints.Dict[int, int], d.element_type)
     assert_that(d, equal_to([{1: 2, 3: 4}]))
@@ -1763,9 +1763,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | (beam.Create('d', [('1', 2), ('3', 4)])
+         | 'd' >> (beam.Create([('1', 2), ('3', 4)])
             .with_output_types(typehints.Tuple[str, int]))
-         | combine.ToDict('to dict'))
+         | 'to dict' >> combine.ToDict())
 
     self.assertCompatible(typehints.Dict[str, int], d.element_type)
     assert_that(d, equal_to([{'1': 2, '3': 4}]))
@@ -1776,8 +1776,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(TypeError) as e:
       (self.p
-       | beam.Create('t', [1, 2, 3]).with_output_types(int)
-       | beam.Map('len', lambda x: len(x)).with_output_types(int))
+       | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
       self.p.run()
 
     # Our special type-checking related TypeError shouldn't have been raised.
@@ -1799,8 +1799,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
 
   def test_pipeline_inference(self):
-    created = self.p | beam.Create('c', ['a', 'b', 'c'])
-    mapped = created | beam.Map('pair with 1', lambda x: (x, 1))
+    created = self.p | 'c' >> beam.Create(['a', 'b', 'c'])
+    mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1))
     grouped = mapped | beam.GroupByKey()
     self.assertEqual(str, created.element_type)
     self.assertEqual(typehints.KV[str, int], mapped.element_type)
@@ -1810,8 +1810,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_inferred_bad_kv_type(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       _ = (self.p
-           | beam.Create('t', ['a', 'b', 'c'])
-           | beam.Map('ungroupable', lambda x: (x, 0, 1.0))
+           | 't' >> beam.Create(['a', 'b', 'c'])
+           | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
            | beam.GroupByKey())
 
     self.assertEqual('Input type hint violation at GroupByKey: '
@@ -1821,11 +1821,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_type_inference_command_line_flag_toggle(self):
     self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    x = self.p | beam.Create('t', [1, 2, 3, 4])
+    x = self.p | 't' >> beam.Create([1, 2, 3, 4])
     self.assertIsNone(x.element_type)
 
     self.p.options.view_as(TypeOptions).pipeline_type_check = True
-    x = self.p | beam.Create('m', [1, 2, 3, 4])
+    x = self.p | 'm' >> beam.Create([1, 2, 3, 4])
     self.assertEqual(int, x.element_type)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index b7a121d..bbb7787 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -160,9 +160,9 @@ def KvSwap(label='KvSwap'):  # pylint: disable=invalid-name
 def RemoveDuplicates(pcoll):  # pylint: disable=invalid-name
   """Produces a PCollection containing the unique elements of a PCollection."""
   return (pcoll
-          | Map('ToPairs', lambda v: (v, None))
-          | CombinePerKey('Group', lambda vs: None)
-          | Keys('RemoveDuplicates'))
+          | 'ToPairs' >> Map(lambda v: (v, None))
+          | 'Group' >> CombinePerKey(lambda vs: None)
+          | 'RemoveDuplicates' >> Keys())
 
 
 class DataflowAssertException(Exception):
@@ -220,7 +220,7 @@ def assert_that(actual, matcher, label='assert_that'):
   class AssertThat(PTransform):
 
     def apply(self, pipeline):
-      return pipeline | Create('singleton', [None]) | Map(match, AllOf(actual))
+      return pipeline | 'singleton' >> Create([None]) | Map(match, AllOf(actual))
 
     def default_label(self):
       return label

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 186fcd4..012dde4 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -127,14 +127,14 @@ class WindowTest(unittest.TestCase):
     self.assertEqual([IntervalWindow(2, 25)], merge(2, 15, 10))
 
   def timestamped_key_values(self, pipeline, key, *timestamps):
-    return (pipeline | Create('start', timestamps)
+    return (pipeline | 'start' >> Create(timestamps)
             | Map(lambda x: WindowedValue((key, x), x, [])))
 
   def test_sliding_windows(self):
     p = Pipeline('DirectPipelineRunner')
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
     result = (pcoll
-              | WindowInto('w', SlidingWindows(period=2, size=4))
+              | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
               | GroupByKey()
               | reify_windows)
     expected = [('key @ [-2.0, 2.0)', [1]),
@@ -147,7 +147,7 @@ class WindowTest(unittest.TestCase):
     p = Pipeline('DirectPipelineRunner')
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
     result = (pcoll
-              | WindowInto('w', Sessions(10))
+              | 'w' >> WindowInto(Sessions(10))
               | GroupByKey()
               | sort_values
               | reify_windows)
@@ -159,9 +159,9 @@ class WindowTest(unittest.TestCase):
   def test_timestamped_value(self):
     p = Pipeline('DirectPipelineRunner')
     result = (p
-              | Create('start', [(k, k) for k in range(10)])
+              | 'start' >> Create([(k, k) for k in range(10)])
               | Map(lambda (x, t): TimestampedValue(x, t))
-              | WindowInto('w', FixedWindows(5))
+              | 'w' >> WindowInto(FixedWindows(5))
               | Map(lambda v: ('key', v))
               | GroupByKey())
     assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
@@ -172,13 +172,13 @@ class WindowTest(unittest.TestCase):
     p = Pipeline('DirectPipelineRunner')
     result = (p
               # Create some initial test values.
-              | Create('start', [(k, k) for k in range(10)])
+              | 'start' >> Create([(k, k) for k in range(10)])
               # The purpose of the WindowInto transform is to establish a
               # FixedWindows windowing function for the PCollection.
               # It does not bucket elements into windows since the timestamps
               # from Create are not spaced 5 ms apart and very likely they all
               # fall into the same window.
-              | WindowInto('w', FixedWindows(5))
+              | 'w' >> WindowInto(FixedWindows(5))
               # Generate timestamped values using the values as timestamps.
               # Now there are values 5 ms apart and since Map propagates the
               # windowing function from input to output the output PCollection

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 51163bc..af3668c 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -100,7 +100,7 @@ class WriteTest(unittest.TestCase):
     write_to_test_sink = WriteToTestSink(return_init_result,
                                          return_write_results)
     p = Pipeline(options=PipelineOptions([]))
-    result = p | beam.Create('start', data) | write_to_test_sink
+    result = p | 'start' >> beam.Create(data) | write_to_test_sink
 
     assert_that(result, is_empty())
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index b25f231..4e1ab68 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -80,7 +80,7 @@ class MainInputTest(unittest.TestCase):
       ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
 
     with self.assertRaises(typehints.TypeCheckError):
-      [1, 2, 3] | (beam.ParDo(MyDoFn()) | beam.ParDo('again', MyDoFn()))
+      [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
 
   def test_typed_dofn_instance(self):
     class MyDoFn(beam.DoFn):
@@ -95,7 +95,7 @@ class MainInputTest(unittest.TestCase):
       ['a', 'b', 'c'] | beam.ParDo(my_do_fn)
 
     with self.assertRaises(typehints.TypeCheckError):
-      [1, 2, 3] | (beam.ParDo(my_do_fn) | beam.ParDo('again', my_do_fn))
+      [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
 
 
 class SideInputTest(unittest.TestCase):
@@ -170,14 +170,14 @@ class SideInputTest(unittest.TestCase):
       return s * times
     p = beam.Pipeline(options=PipelineOptions([]))
     main_input = p | beam.Create(['a', 'bb', 'c'])
-    side_input = p | beam.Create('side', [3])
+    side_input = p | 'side' >> beam.Create([3])
     result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
     assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
     p.run()
 
-    bad_side_input = p | beam.Create('bad_side', ['z'])
+    bad_side_input = p | 'bad_side' >> beam.Create(['z'])
     with self.assertRaises(typehints.TypeCheckError):
-      main_input | beam.Map('again', repeat, pvalue.AsSingleton(bad_side_input))
+      main_input | 'again' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
 
   def test_deferred_side_input_iterable(self):
     @typehints.with_input_types(str, typehints.Iterable[str])
@@ -185,14 +185,14 @@ class SideInputTest(unittest.TestCase):
       return glue.join(sorted(items))
     p = beam.Pipeline(options=PipelineOptions([]))
     main_input = p | beam.Create(['a', 'bb', 'c'])
-    side_input = p | beam.Create('side', ['x', 'y', 'z'])
+    side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
     result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
     assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
     p.run()
 
-    bad_side_input = p | beam.Create('bad_side', [1, 2, 3])
+    bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3])
     with self.assertRaises(typehints.TypeCheckError):
-      main_input | beam.Map('fail', concat, pvalue.AsIter(bad_side_input))
+      main_input | 'fail' >> beam.Map(concat, pvalue.AsIter(bad_side_input))
 
 
 class CustomTransformTest(unittest.TestCase):


[11/12] incubator-beam git commit: Final cleanup pass.

Posted by ro...@apache.org.
Final cleanup pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3c078fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3c078fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3c078fe

Branch: refs/heads/python-sdk
Commit: e3c078fe28553b7e7317316b6df51b4c570573ba
Parents: c5b5b14
Author: Robert Bradshaw <ro...@google.com>
Authored: Sat Jul 23 01:32:23 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete.py            |  4 ++--
 .../examples/complete/autocomplete_test.py       |  4 ++--
 .../apache_beam/examples/complete/estimate_pi.py |  5 ++---
 .../examples/complete/top_wikipedia_sessions.py  |  4 ++--
 .../complete/top_wikipedia_sessions_test.py      |  2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py  | 12 +++++-------
 .../examples/cookbook/custom_ptransform.py       |  9 +++++----
 .../examples/cookbook/custom_ptransform_test.py  |  2 +-
 .../apache_beam/examples/cookbook/filters.py     |  4 ++--
 .../examples/cookbook/group_with_coder.py        |  9 +++++----
 .../examples/cookbook/multiple_output_pardo.py   | 14 +++++++-------
 .../apache_beam/examples/snippets/snippets.py    | 19 +++++++++----------
 .../apache_beam/examples/streaming_wordcap.py    |  4 ++--
 13 files changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 10d9009..c3cd88f 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -48,8 +48,8 @@ def run(argv=None):
    | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
    | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
    | 'TopPerPrefix' >> TopPerPrefix(5)
-   | beam.Map('format',
-              lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+   | 'format' >> beam.Map(
+       lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
    | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 18d0511..0d20482 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
 
   def test_top_prefixes(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | 'create' >> beam.Create(self.WORDS)
-    result = words | 'test' >> autocomplete.TopPerPrefix(5)
+    words = p | beam.Create(self.WORDS)
+    result = words | autocomplete.TopPerPrefix(5)
     # values must be hashable for now
     result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
     assert_that(result, equal_to(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index c33db1d..37c1aad 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -112,9 +112,8 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | 'Estimate' >> EstimatePiTransform()
-   | beam.io.Write('Write',
-                   beam.io.TextFileSink(known_args.output,
+   | EstimatePiTransform()
+   | beam.io.Write(beam.io.TextFileSink(known_args.output,
                                         coder=JsonCoder())))
 
   # Actually run the pipeline (all operations above are deferred).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 7468484..a48a383 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -168,9 +168,9 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
+   | beam.Read(beam.io.TextFileSource(known_args.input))
    | ComputeTopSessions(known_args.sampling_threshold)
-   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
 
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index 207d6c4..a84cc78 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
 
   def test_compute_top_sessions(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    edits = p | 'create' >> beam.Create(self.EDITS)
+    edits = p | beam.Create(self.EDITS)
     result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 
     beam.assert_that(result, beam.equal_to(self.EXPECTED))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index f7070dc..a076a0c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -54,8 +54,8 @@ def run(argv=None):
 
   # Read the text file[pattern] into a PCollection.
   lines = p | beam.io.Read(
-      'read', beam.io.TextFileSource(known_args.input,
-                                     coder=beam.coders.BytesCoder()))
+      beam.io.TextFileSource(known_args.input,
+                             coder=beam.coders.BytesCoder()))
 
   # Count the occurrences of each word.
   output = (lines
@@ -68,7 +68,7 @@ def run(argv=None):
                 lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
 
   # Write the output using a "Write" transform that has side effects.
-  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+  output | beam.io.Write(beam.io.TextFileSink(known_args.output))
 
   # Optionally write the input and output checksums.
   if known_args.checksum_output:
@@ -76,16 +76,14 @@ def run(argv=None):
                   | 'input-csum' >> beam.Map(crc32line)
                   | 'combine-input-csum' >> beam.CombineGlobally(sum)
                   | 'hex-format' >> beam.Map(lambda x: '%x' % x))
-    input_csum | beam.io.Write(
-        'write-input-csum',
+    input_csum | 'write-input-csum' >> beam.io.Write(
         beam.io.TextFileSink(known_args.checksum_output + '-input'))
 
     output_csum = (output
                    | 'output-csum' >> beam.Map(crc32line)
                    | 'combine-output-csum' >> beam.CombineGlobally(sum)
                    | 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
-    output_csum | beam.io.Write(
-        'write-output-csum',
+    output_csum | 'write-output-csum' >> beam.io.Write(
         beam.io.TextFileSink(known_args.checksum_output + '-output'))
 
   # Actually run the pipeline (all operations above are deferred).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 021eff6..ca13bbf 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -39,7 +39,7 @@ class Count1(beam.PTransform):
   def apply(self, pcoll):
     return (
         pcoll
-        | 'Init' >> beam.Map(lambda v: (v, 1))
+        | 'ParWithOne' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 
 
@@ -47,7 +47,8 @@ def run_count1(known_args, options):
   """Runs the first example pipeline."""
   logging.info('Running first pipeline')
   p = beam.Pipeline(options=options)
-  (p | beam.io.Read(beam.io.TextFileSource(known_args.input)) | Count1()
+  (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+   | Count1()
    | beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
@@ -57,7 +58,7 @@ def Count2(pcoll):  # pylint: disable=invalid-name
   """Count as a decorated function."""
   return (
       pcoll
-      | 'Init' >> beam.Map(lambda v: (v, 1))
+      | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
       | beam.CombinePerKey(sum))
 
 
@@ -84,7 +85,7 @@ def Count3(pcoll, factor=1):  # pylint: disable=invalid-name
   """
   return (
       pcoll
-      | 'Init' >> beam.Map(lambda v: (v, factor))
+      | 'PairWithOne' >> beam.Map(lambda v: (v, factor))
       | beam.CombinePerKey(sum))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 603742f..806b031 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
 
   def run_pipeline(self, count_implementation, factor=1):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+    words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
     result = words | count_implementation
     assert_that(
         result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index efd0ba7..b3a969a 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -67,8 +67,8 @@ def filter_cold_days(input_data, month_filter):
   return (
       fields_of_interest
       | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
-      | beam.Filter('below mean',
-                    lambda row, mean: row['mean_temp'] < mean, global_mean))
+      | 'below mean' >> beam.Filter(
+          lambda row, mean: row['mean_temp'] < mean, global_mean))
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 6c86f61..651a4f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -98,19 +98,20 @@ def run(argv=sys.argv[1:]):
   coders.registry.register_coder(Player, PlayerCoder)
 
   (p  # pylint: disable=expression-not-assigned
-   | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+   | beam.io.Read(beam.io.TextFileSource(known_args.input))
    # The get_players function is annotated with a type hint above, so the type
    # system knows the output type of the following operation is a key-value pair
    # of a Player and an int. Please see the documentation for details on
    # types that are inferred automatically as well as other ways to specify
    # type hints.
-   | 'get players' >> beam.Map(get_players)
+   | beam.Map(get_players)
    # The output type hint of the previous step is used to infer that the key
    # type of the following operation is the Player type. Since a custom coder
    # is registered for the Player class above, a PlayerCoder will be used to
    # encode Player objects as keys for this combine operation.
-   | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
-   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | beam.CombinePerKey(sum)
+   | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
+   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 187d20b..d24170e 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -137,7 +137,7 @@ def run(argv=None):
   pipeline_options.view_as(SetupOptions).save_main_session = True
   p = beam.Pipeline(options=pipeline_options)
 
-  lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
+  lines = p | beam.Read(beam.io.TextFileSource(known_args.input))
 
   # with_outputs allows accessing the side outputs of a DoFn.
   split_lines_result = (lines
@@ -158,20 +158,20 @@ def run(argv=None):
    | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
    | beam.GroupByKey()
    | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
-   | beam.Write('write chars',
-                beam.io.TextFileSink(known_args.output + '-chars')))
+   | 'write chars' >> beam.Write(
+       beam.io.TextFileSink(known_args.output + '-chars')))
 
   # pylint: disable=expression-not-assigned
   (short_words
    | 'count short words' >> CountWords()
-   | beam.Write('write short words',
-                beam.io.TextFileSink(known_args.output + '-short-words')))
+   | 'write short words' >> beam.Write(
+       beam.io.TextFileSink(known_args.output + '-short-words')))
 
   # pylint: disable=expression-not-assigned
   (words
    | 'count words' >> CountWords()
-   | beam.Write('write words',
-                beam.io.TextFileSink(known_args.output + '-words')))
+   | 'write words' >> beam.Write(
+       beam.io.TextFileSink(known_args.output + '-words')))
 
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9f3d6e1..891f464 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -105,9 +105,8 @@ def construct_pipeline(renames):
 
   # [START pipelines_constructing_writing]
   filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
-  filtered_words | beam.io.Write('WriteMyFile',
-                                 beam.io.TextFileSink(
-                                     'gs://some/outputData.txt'))
+  filtered_words | 'WriteMyFile' >> beam.io.Write(
+      beam.io.TextFileSink('gs://some/outputData.txt'))
   # [END pipelines_constructing_writing]
 
   p.visit(SnippetUtils.RenameFiles(renames))
@@ -242,8 +241,8 @@ def pipeline_options_remote(argv):
   options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
   p = Pipeline(options=options)
 
-  lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
-  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
+  lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
+  lines | beam.io.Write(beam.io.TextFileSink(my_output))
 
   p.run()
 
@@ -283,8 +282,8 @@ def pipeline_options_local(argv):
   p = Pipeline(options=options)
   # [END pipeline_options_local]
 
-  lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
-  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
+  lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
+  lines | beam.io.Write(beam.io.TextFileSink(my_output))
   p.run()
 
 
@@ -344,8 +343,8 @@ def pipeline_logging(lines, output):
   p = beam.Pipeline(options=PipelineOptions())
   (p
    | beam.Create(lines)
-   | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
-   | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output)))
+   | beam.ParDo(ExtractWordsFn())
+   | beam.io.Write(beam.io.TextFileSink(output)))
 
   p.run()
 
@@ -1160,6 +1159,6 @@ class Count(beam.PTransform):
   def apply(self, pcoll):
     return (
         pcoll
-        | 'Init' >> beam.Map(lambda v: (v, 1))
+        | 'PairWithOne' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 # [END model_library_transforms_count]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index ef95a5f..d25ec3e 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -45,7 +45,7 @@ def run(argv=None):
 
   # Read the text file[pattern] into a PCollection.
   lines = p | beam.io.Read(
-      'read', beam.io.PubSubSource(known_args.input_topic))
+      beam.io.PubSubSource(known_args.input_topic))
 
   # Capitalize the characters in each line.
   transformed = (lines
@@ -54,7 +54,7 @@ def run(argv=None):
   # Write to PubSub.
   # pylint: disable=expression-not-assigned
   transformed | beam.io.Write(
-      'pubsub_write', beam.io.PubSubSink(known_args.output_topic))
+      beam.io.PubSubSink(known_args.output_topic))
 
   p.run()
 


[12/12] incubator-beam git commit: Closes #718

Posted by ro...@apache.org.
Closes #718


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38d9dea2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38d9dea2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38d9dea2

Branch: refs/heads/python-sdk
Commit: 38d9dea2e62af280e4b9c258cedee70d6bcaa8ca
Parents: 9fe102a e3c078f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Jul 23 16:43:47 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:47 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        |  99 ++--
 .../examples/complete/autocomplete.py           |  16 +-
 .../examples/complete/autocomplete_test.py      |   4 +-
 .../examples/complete/estimate_pi.py            |  14 +-
 .../examples/complete/estimate_pi_test.py       |   2 +-
 .../complete/juliaset/juliaset/juliaset.py      |  11 +-
 .../apache_beam/examples/complete/tfidf.py      |  32 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../examples/complete/top_wikipedia_sessions.py |   8 +-
 .../complete/top_wikipedia_sessions_test.py     |   2 +-
 .../examples/cookbook/bigquery_schema.py        |   4 +-
 .../examples/cookbook/bigquery_side_input.py    |  18 +-
 .../cookbook/bigquery_side_input_test.py        |  12 +-
 .../examples/cookbook/bigquery_tornadoes.py     |  10 +-
 .../cookbook/bigquery_tornadoes_test.py         |   2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |  31 +-
 .../apache_beam/examples/cookbook/coders.py     |   2 +-
 .../examples/cookbook/coders_test.py            |   4 +-
 .../examples/cookbook/custom_ptransform.py      |   9 +-
 .../examples/cookbook/custom_ptransform_test.py |   2 +-
 .../apache_beam/examples/cookbook/filters.py    |  14 +-
 .../examples/cookbook/filters_test.py           |   2 +-
 .../examples/cookbook/group_with_coder.py       |   9 +-
 .../examples/cookbook/mergecontacts.py          |   8 +-
 .../examples/cookbook/multiple_output_pardo.py  |  30 +-
 .../apache_beam/examples/snippets/snippets.py   |  69 ++-
 .../examples/snippets/snippets_test.py          |  16 +-
 .../apache_beam/examples/streaming_wordcap.py   |   6 +-
 .../apache_beam/examples/streaming_wordcount.py |   8 +-
 sdks/python/apache_beam/examples/wordcount.py   |  16 +-
 .../apache_beam/examples/wordcount_debugging.py |  18 +-
 .../apache_beam/examples/wordcount_minimal.py   |  16 +-
 sdks/python/apache_beam/io/avroio.py            |   2 +-
 sdks/python/apache_beam/io/bigquery.py          |   4 +-
 .../apache_beam/io/filebasedsource_test.py      |   4 +-
 sdks/python/apache_beam/io/iobase.py            |   4 +-
 sdks/python/apache_beam/pipeline.py             |  13 +
 sdks/python/apache_beam/pipeline_test.py        |  48 +-
 sdks/python/apache_beam/pvalue_test.py          |   6 +-
 .../consumer_tracking_pipeline_visitor_test.py  |   4 +-
 sdks/python/apache_beam/runners/runner_test.py  |   6 +-
 .../apache_beam/transforms/combiners_test.py    |  64 ++-
 sdks/python/apache_beam/transforms/core.py      |  21 +-
 .../python/apache_beam/transforms/ptransform.py |   9 +-
 .../apache_beam/transforms/ptransform_test.py   | 521 ++++++++++---------
 sdks/python/apache_beam/transforms/util.py      |   9 +-
 .../apache_beam/transforms/window_test.py       |  14 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 .../typehints/typed_pipeline_test.py            |  16 +-
 49 files changed, 628 insertions(+), 615 deletions(-)
----------------------------------------------------------------------



[07/12] incubator-beam git commit: Cleanup and fix combiners_test.

Posted by ro...@apache.org.
Cleanup and fix combiners_test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01830510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01830510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01830510

Branch: refs/heads/python-sdk
Commit: 01830510bded3c22ddd96937f5d83547702a4385
Parents: 7c186ce
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 16:05:23 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/transforms/combiners_test.py    | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01830510/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 0439fe1..c970382 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -79,12 +79,11 @@ class CombineTest(unittest.TestCase):
     assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp')
 
     # Again for per-key combines.
-    pcoll = pipeline | Create(
-        'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+    pcoll = pipeline | 'start-perkye' >> Create(
+        [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
     result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
     result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
-    result_key_cmp = pcoll | combine.Top.PerKey(
-        'cmp-perkey',
+    result_key_cmp = pcoll | 'cmp-perkey' >> combine.Top.PerKey(
         6,
         lambda a, b, names: len(names[a]) < len(names[b]),
         names)  # Note parameter passed to comparator.
@@ -105,11 +104,11 @@ class CombineTest(unittest.TestCase):
     assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
     assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
 
-    pcoll = pipeline | Create(
-        'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+    pcoll = pipeline | 'start-perkey' >> Create(
+        [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
     result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5))
-    result_kbot = pcoll | beam.CombinePerKey(
-        'bot-perkey', combine.Smallest(4))
+    result_kbot = pcoll | 'bot-perkey' >> beam.CombinePerKey(
+        combine.Smallest(4))
     assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
     assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot')
     pipeline.run()
@@ -138,8 +137,7 @@ class CombineTest(unittest.TestCase):
 
     # Now test per-key samples.
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | Create(
-        'start-perkey',
+    pcoll = pipeline | 'start-perkey' >> Create(
         sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
     result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
 
@@ -158,7 +156,7 @@ class CombineTest(unittest.TestCase):
     p = Pipeline('DirectPipelineRunner')
     result = (
         p
-        | 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+        | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
         | beam.CombineGlobally(combine.TupleCombineFn(max,
                                                       combine.MeanCombineFn(),
                                                       sum)).without_defaults())


[05/12] incubator-beam git commit: fixup: failing tests expecting name

Posted by ro...@apache.org.
fixup: failing tests expecting name


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5b5b14d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5b5b14d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5b5b14d

Branch: refs/heads/python-sdk
Commit: c5b5b14d35fc7f6b0a576f0fca19b730015e1282
Parents: b15d35c
Author: Robert Bradshaw <ro...@google.com>
Authored: Sat Jul 23 01:07:44 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/ptransform_test.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5b5b14d/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 992f944..b99cd26 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -931,7 +931,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([1, 2, 3]).with_output_types(int)
-       | beam.GroupByKeyOnly())
+       | 'F' >> beam.GroupByKeyOnly())
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -945,7 +945,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       (self.p
        | (beam.Create(range(5))
           .with_output_types(typehints.Iterable[int]))
-       | beam.GroupByKey())
+       | 'T' >> beam.GroupByKey())
 
     self.assertEqual("Input type hint violation at T: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1563,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create([1, 1, 2, 3])
+       | 'f' >> beam.Create([1, 1, 2, 3])
        | 'count elems' >> combine.Count.PerElement())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '


[03/12] incubator-beam git commit: Move names out of transform constructors.

Posted by ro...@apache.org.
Move names out of transform constructors.

sed -i -r 's/[|] (\S+)[(](["'"'"'][^"'"'"']+.)(, +|([)]))/| \2 >> \1(\4/g'

Small number of tests will need to be fixed by hand.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/031c4cce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/031c4cce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/031c4cce

Branch: refs/heads/python-sdk
Commit: 031c4cce0b9eae0d50a49f43ffeced1edbfd2f8f
Parents: 937cf69
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 14:34:58 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        |  96 ++--
 .../examples/complete/autocomplete.py           |   8 +-
 .../examples/complete/autocomplete_test.py      |   4 +-
 .../examples/complete/estimate_pi.py            |   8 +-
 .../examples/complete/estimate_pi_test.py       |   2 +-
 .../complete/juliaset/juliaset/juliaset.py      |   8 +-
 .../apache_beam/examples/complete/tfidf.py      |  32 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../examples/complete/top_wikipedia_sessions.py |   8 +-
 .../complete/top_wikipedia_sessions_test.py     |   2 +-
 .../examples/cookbook/bigquery_schema.py        |   4 +-
 .../examples/cookbook/bigquery_side_input.py    |   6 +-
 .../cookbook/bigquery_side_input_test.py        |   8 +-
 .../examples/cookbook/bigquery_tornadoes.py     |   6 +-
 .../cookbook/bigquery_tornadoes_test.py         |   2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |  18 +-
 .../apache_beam/examples/cookbook/coders.py     |   2 +-
 .../examples/cookbook/coders_test.py            |   4 +-
 .../examples/cookbook/custom_ptransform.py      |   6 +-
 .../examples/cookbook/custom_ptransform_test.py |   2 +-
 .../apache_beam/examples/cookbook/filters.py    |  10 +-
 .../examples/cookbook/filters_test.py           |   2 +-
 .../examples/cookbook/group_with_coder.py       |   6 +-
 .../examples/cookbook/mergecontacts.py          |   8 +-
 .../examples/cookbook/multiple_output_pardo.py  |  18 +-
 .../apache_beam/examples/snippets/snippets.py   |  62 +--
 .../examples/snippets/snippets_test.py          |  10 +-
 .../apache_beam/examples/streaming_wordcap.py   |   2 +-
 .../apache_beam/examples/streaming_wordcount.py |   8 +-
 sdks/python/apache_beam/examples/wordcount.py   |  14 +-
 .../apache_beam/examples/wordcount_debugging.py |  16 +-
 .../apache_beam/examples/wordcount_minimal.py   |  14 +-
 sdks/python/apache_beam/io/avroio.py            |   2 +-
 sdks/python/apache_beam/io/bigquery.py          |   4 +-
 .../apache_beam/io/filebasedsource_test.py      |   4 +-
 sdks/python/apache_beam/io/iobase.py            |   4 +-
 sdks/python/apache_beam/pipeline_test.py        |  42 +-
 sdks/python/apache_beam/pvalue_test.py          |   6 +-
 .../consumer_tracking_pipeline_visitor_test.py  |   4 +-
 sdks/python/apache_beam/runners/runner_test.py  |   6 +-
 .../apache_beam/transforms/combiners_test.py    |  48 +-
 sdks/python/apache_beam/transforms/core.py      |  16 +-
 .../python/apache_beam/transforms/ptransform.py |   2 +-
 .../apache_beam/transforms/ptransform_test.py   | 498 +++++++++----------
 sdks/python/apache_beam/transforms/util.py      |   8 +-
 .../apache_beam/transforms/window_test.py       |  14 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 .../typehints/typed_pipeline_test.py            |  16 +-
 48 files changed, 537 insertions(+), 537 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index 476f8b2..bf66851 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -54,33 +54,33 @@ class DataflowTest(unittest.TestCase):
   def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
     """A Count transform: v, ... => (v, n), ..."""
     return (pcoll
-            | Map('AddCount', lambda x: (x, 1))
-            | GroupByKey('GroupCounts')
-            | Map('AddCounts', lambda (x, ones): (x, sum(ones))))
+            | 'AddCount' >> Map(lambda x: (x, 1))
+            | 'GroupCounts' >> GroupByKey()
+            | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
 
   def test_word_count(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    lines = pipeline | Create('SomeWords', DataflowTest.SAMPLE_DATA)
+    lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
     result = (
-        (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+        (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
         .apply('CountWords', DataflowTest.Count))
     assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
     pipeline.run()
 
   def test_map(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    lines = pipeline | Create('input', ['a', 'b', 'c'])
+    lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
     result = (lines
-              | Map('upper', str.upper)
-              | Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
+              | 'upper' >> Map(str.upper)
+              | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
     assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
     pipeline.run()
 
   def test_par_do_with_side_input_as_arg(self):
     pipeline = Pipeline('DirectPipelineRunner')
     words_list = ['aa', 'bb', 'cc']
-    words = pipeline | Create('SomeWords', words_list)
-    prefix = pipeline | Create('SomeString', ['xyz'])  # side in
+    words = pipeline | 'SomeWords' >> Create(words_list)
+    prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
     suffix = 'zyx'
     result = words | FlatMap(
         'DecorateWords',
@@ -92,9 +92,9 @@ class DataflowTest(unittest.TestCase):
   def test_par_do_with_side_input_as_keyword_arg(self):
     pipeline = Pipeline('DirectPipelineRunner')
     words_list = ['aa', 'bb', 'cc']
-    words = pipeline | Create('SomeWords', words_list)
+    words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = 'zyx'
-    suffix = pipeline | Create('SomeString', ['xyz'])  # side in
+    suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
     result = words | FlatMap(
         'DecorateWords',
         lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
@@ -111,10 +111,10 @@ class DataflowTest(unittest.TestCase):
 
     pipeline = Pipeline('DirectPipelineRunner')
     words_list = ['aa', 'bb', 'cc']
-    words = pipeline | Create('SomeWords', words_list)
+    words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = 'zyx'
-    suffix = pipeline | Create('SomeString', ['xyz'])  # side in
-    result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
+    suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
+    result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
                            suffix=AsSingleton(suffix))
     assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
     pipeline.run()
@@ -131,7 +131,7 @@ class DataflowTest(unittest.TestCase):
           yield SideOutputValue('odd', context.element)
 
     pipeline = Pipeline('DirectPipelineRunner')
-    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | ParDo(
         'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
     assert_that(results.main, equal_to([1, 2, 3, 4]))
@@ -147,7 +147,7 @@ class DataflowTest(unittest.TestCase):
         return [v, SideOutputValue('odd', v)]
 
     pipeline = Pipeline('DirectPipelineRunner')
-    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | FlatMap(
         'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
     assert_that(results.main, equal_to([1, 2, 3, 4]))
@@ -157,37 +157,37 @@ class DataflowTest(unittest.TestCase):
 
   def test_empty_singleton_side_input(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcol = pipeline | Create('start', [1, 2])
-    side = pipeline | Create('side', [])  # Empty side input.
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([])  # Empty side input.
 
     def my_fn(k, s):
       v = ('empty' if isinstance(s, EmptySideInput) else 'full')
       return [(k, v)]
-    result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
+    result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side))
     assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
     pipeline.run()
 
   def test_multi_valued_singleton_side_input(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcol = pipeline | Create('start', [1, 2])
-    side = pipeline | Create('side', [3, 4])  # 2 values in side input.
-    pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))  # pylint: disable=expression-not-assigned
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
+    pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side))  # pylint: disable=expression-not-assigned
     with self.assertRaises(ValueError):
       pipeline.run()
 
   def test_default_value_singleton_side_input(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcol = pipeline | Create('start', [1, 2])
-    side = pipeline | Create('side', [])  # 0 values in side input.
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([])  # 0 values in side input.
     result = (
-        pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
+        pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)))
     assert_that(result, equal_to([10, 20]))
     pipeline.run()
 
   def test_iterable_side_input(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcol = pipeline | Create('start', [1, 2])
-    side = pipeline | Create('side', [3, 4])  # 2 values in side input.
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
     result = pcol | FlatMap('compute',
                             lambda x, s: [x * y for y in s], AllOf(side))
     assert_that(result, equal_to([3, 4, 6, 8]))
@@ -195,7 +195,7 @@ class DataflowTest(unittest.TestCase):
 
   def test_undeclared_side_outputs(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
     results = nums | FlatMap(
         'ClassifyNumbers',
         lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
@@ -210,7 +210,7 @@ class DataflowTest(unittest.TestCase):
 
   def test_empty_side_outputs(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    nums = pipeline | Create('Some Numbers', [1, 3, 5])
+    nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
     results = nums | FlatMap(
         'ClassifyNumbers',
         lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
@@ -224,9 +224,9 @@ class DataflowTest(unittest.TestCase):
     a_list = [5, 1, 3, 2, 9]
     some_pairs = [('crouton', 17), ('supreme', None)]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
-    side_pairs = pipeline | Create('side pairs', some_pairs)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
+    side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
     results = main_input | FlatMap(
         'concatenate',
         lambda x, the_list, the_dict: [[x, the_list, the_dict]],
@@ -248,8 +248,8 @@ class DataflowTest(unittest.TestCase):
     # with the same defaults will return the same PCollectionView.
     a_list = [2]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
         'test',
         lambda x, s1, s2: [[x, s1, s2]],
@@ -271,8 +271,8 @@ class DataflowTest(unittest.TestCase):
     # distinct PCollectionViews with the same full_label.
     a_list = [2]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
 
     with self.assertRaises(RuntimeError) as e:
       _ = main_input | FlatMap(
@@ -287,8 +287,8 @@ class DataflowTest(unittest.TestCase):
   def test_as_singleton_with_different_defaults_with_unique_labels(self):
     a_list = []
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
         'test',
         lambda x, s1, s2: [[x, s1, s2]],
@@ -311,8 +311,8 @@ class DataflowTest(unittest.TestCase):
     # return the same PCollectionView.
     a_list = [1, 2, 3]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
         'test',
         lambda x, ls1, ls2: [[x, ls1, ls2]],
@@ -332,8 +332,8 @@ class DataflowTest(unittest.TestCase):
   def test_as_list_with_unique_labels(self):
     a_list = [1, 2, 3]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_list = pipeline | Create('side list', a_list)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
     results = main_input | FlatMap(
         'test',
         lambda x, ls1, ls2: [[x, ls1, ls2]],
@@ -353,8 +353,8 @@ class DataflowTest(unittest.TestCase):
   def test_as_dict_with_unique_labels(self):
     some_kvs = [('a', 1), ('b', 2)]
     pipeline = Pipeline('DirectPipelineRunner')
-    main_input = pipeline | Create('main input', [1])
-    side_kvs = pipeline | Create('side kvs', some_kvs)
+    main_input = pipeline | 'main input' >> Create([1])
+    side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
     results = main_input | FlatMap(
         'test',
         lambda x, dct1, dct2: [[x, dct1, dct2]],
@@ -383,10 +383,10 @@ class DataflowTest(unittest.TestCase):
         return existing_windows
 
     pipeline = Pipeline('DirectPipelineRunner')
-    numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)])
+    numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
     result = (numbers
-              | WindowInto('W', windowfn=TestWindowFn())
-              | GroupByKey('G'))
+              | 'W' >> WindowInto(windowfn=TestWindowFn())
+              | 'G' >> GroupByKey())
     assert_that(
         result, equal_to([(1, [10]), (1, [10]), (2, [20]),
                           (2, [20]), (3, [30]), (3, [30])]))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 0f1e96e..b68bc56 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -45,12 +45,12 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
-   | beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
-   | TopPerPrefix('TopPerPrefix', 5)
+   | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+   | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+   | 'TopPerPrefix' >> TopPerPrefix(5)
    | beam.Map('format',
               lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
-   | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 84f947b..18d0511 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
 
   def test_top_prefixes(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | beam.Create('create', self.WORDS)
-    result = words | autocomplete.TopPerPrefix('test', 5)
+    words = p | 'create' >> beam.Create(self.WORDS)
+    result = words | 'test' >> autocomplete.TopPerPrefix(5)
     # values must be hashable for now
     result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
     assert_that(result, equal_to(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 09faecf..ef9f8cc 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -96,9 +96,9 @@ class EstimatePiTransform(beam.PTransform):
   def apply(self, pcoll):
     # A hundred work items of a hundred thousand tries each.
     return (pcoll
-            | beam.Create('Initialize', [100000] * 100).with_output_types(int)
-            | beam.Map('Run trials', run_trials)
-            | beam.CombineGlobally('Sum', combine_results).without_defaults())
+            | 'Initialize' >> beam.Create([100000] * 100).with_output_types(int)
+            | 'Run trials' >> beam.Map(run_trials)
+            | 'Sum' >> beam.CombineGlobally(combine_results).without_defaults())
 
 
 def run(argv=None):
@@ -115,7 +115,7 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | EstimatePiTransform('Estimate')
+   | 'Estimate' >> EstimatePiTransform()
    | beam.io.Write('Write',
                    beam.io.TextFileSink(known_args.output,
                                         coder=JsonCoder())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index c633bb1..3967ed5 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -39,7 +39,7 @@ class EstimatePiTest(unittest.TestCase):
 
   def test_basics(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    result = p | estimate_pi.EstimatePiTransform('Estimate')
+    result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()
 
     # Note: Probabilistically speaking this test can fail with a probability
     # that is very small (VERY) given that we run at least 10 million trials.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 2bc37e6..56696c3 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -50,7 +50,7 @@ def generate_julia_set_colors(pipeline, c, n, max_iterations):
         yield (x, y)
 
   julia_set_colors = (pipeline
-                      | beam.Create('add points', point_set(n))
+                      | 'add points' >> beam.Create(point_set(n))
                       | beam.Map(
                           get_julia_set_point_color, c, n, max_iterations))
 
@@ -105,11 +105,11 @@ def run(argv=None):  # pylint: disable=missing-docstring
   # Group each coordinate triplet by its x value, then write the coordinates to
   # the output file with an x-coordinate grouping per line.
   # pylint: disable=expression-not-assigned
-  (coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
-   | beam.GroupByKey('x coord') | beam.Map(
+  (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+   | 'x coord' >> beam.GroupByKey() | beam.Map(
        'format',
        lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
-   | beam.io.Write('write', beam.io.TextFileSink(known_args.coordinate_output)))
+   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
   # pylint: enable=expression-not-assigned
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index ef58cc0..043d5f6 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -42,7 +42,7 @@ def read_documents(pipeline, uris):
         pipeline
         | beam.io.Read('read: %s' % uri, beam.io.TextFileSource(uri))
         | beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
-  return pcolls | beam.Flatten('flatten read pcolls')
+  return pcolls | 'flatten read pcolls' >> beam.Flatten()
 
 
 class TfIdf(beam.PTransform):
@@ -59,9 +59,9 @@ class TfIdf(beam.PTransform):
     # PCollection to use as side input.
     total_documents = (
         uri_to_content
-        | beam.Keys('get uris')
-        | beam.RemoveDuplicates('get unique uris')
-        | beam.combiners.Count.Globally(' count uris'))
+        | 'get uris' >> beam.Keys()
+        | 'get unique uris' >> beam.RemoveDuplicates()
+        | ' count uris' >> beam.combiners.Count.Globally())
 
     # Create a collection of pairs mapping a URI to each of the words
     # in the document associated with that that URI.
@@ -71,28 +71,28 @@ class TfIdf(beam.PTransform):
 
     uri_to_words = (
         uri_to_content
-        | beam.FlatMap('split words', split_into_words))
+        | 'split words' >> beam.FlatMap(split_into_words))
 
     # Compute a mapping from each word to the total number of documents
     # in which it appears.
     word_to_doc_count = (
         uri_to_words
-        | beam.RemoveDuplicates('get unique words per doc')
-        | beam.Values('get words')
-        | beam.combiners.Count.PerElement('count docs per word'))
+        | 'get unique words per doc' >> beam.RemoveDuplicates()
+        | 'get words' >> beam.Values()
+        | 'count docs per word' >> beam.combiners.Count.PerElement())
 
     # Compute a mapping from each URI to the total number of words in the
     # document associated with that URI.
     uri_to_word_total = (
         uri_to_words
-        | beam.Keys(' get uris')
-        | beam.combiners.Count.PerElement('count words in doc'))
+        | ' get uris' >> beam.Keys()
+        | 'count words in doc' >> beam.combiners.Count.PerElement())
 
     # Count, for each (URI, word) pair, the number of occurrences of that word
     # in the document associated with the URI.
     uri_and_word_to_count = (
         uri_to_words
-        | beam.combiners.Count.PerElement('count word-doc pairs'))
+        | 'count word-doc pairs' >> beam.combiners.Count.PerElement())
 
     # Adjust the above collection to a mapping from (URI, word) pairs to counts
     # into an isomorphic mapping from URI to (word, count) pairs, to prepare
@@ -116,7 +116,7 @@ class TfIdf(beam.PTransform):
     #                         ... ]}
     uri_to_word_and_count_and_total = (
         {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
-        | beam.CoGroupByKey('cogroup by uri'))
+        | 'cogroup by uri' >> beam.CoGroupByKey())
 
     # Compute a mapping from each word to a (URI, term frequency) pair for each
     # URI. A word's term frequency for a document is simply the number of times
@@ -132,7 +132,7 @@ class TfIdf(beam.PTransform):
 
     word_to_uri_and_tf = (
         uri_to_word_and_count_and_total
-        | beam.FlatMap('compute term frequencies', compute_term_frequency))
+        | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency))
 
     # Compute a mapping from each word to its document frequency.
     # A word's document frequency in a corpus is the number of
@@ -155,7 +155,7 @@ class TfIdf(beam.PTransform):
     # each keyed on the word.
     word_to_uri_and_tf_and_df = (
         {'tf': word_to_uri_and_tf, 'df': word_to_df}
-        | beam.CoGroupByKey('cogroup words by tf-df'))
+        | 'cogroup words by tf-df' >> beam.CoGroupByKey())
 
     # Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
     # There are a variety of definitions of TF-IDF
@@ -170,7 +170,7 @@ class TfIdf(beam.PTransform):
 
     word_to_uri_and_tfidf = (
         word_to_uri_and_tf_and_df
-        | beam.FlatMap('compute tf-idf', compute_tf_idf))
+        | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf))
 
     return word_to_uri_and_tfidf
 
@@ -197,7 +197,7 @@ def run(argv=None):
   output = pcoll | TfIdf()
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 8f52611..ee7e534 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase):
     result = (
         uri_to_line
         | tfidf.TfIdf()
-        | beam.Map('flatten', lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+        | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
     beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
     # Run the pipeline. Note that the assert_that above adds to the pipeline
     # a check that the result PCollection contains expected values. To actually

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index c46bfc5..7468484 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -134,9 +134,9 @@ class ComputeTopSessions(beam.PTransform):
             | beam.Filter(lambda x: (abs(hash(x)) <=
                                      MAX_TIMESTAMP * self.sampling_threshold))
             | ComputeSessions()
-            | beam.ParDo('SessionsToStrings', SessionsToStringsDoFn())
+            | 'SessionsToStrings' >> beam.ParDo(SessionsToStringsDoFn())
             | TopPerMonth()
-            | beam.ParDo('FormatOutput', FormatOutputDoFn()))
+            | 'FormatOutput' >> beam.ParDo(FormatOutputDoFn()))
 
 
 def run(argv=None):
@@ -168,9 +168,9 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | beam.Read('read', beam.io.TextFileSource(known_args.input))
+   | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
    | ComputeTopSessions(known_args.sampling_threshold)
-   | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
 
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
index fb48641..207d6c4 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -50,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase):
 
   def test_compute_top_sessions(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    edits = p | beam.Create('create', self.EDITS)
+    edits = p | 'create' >> beam.Create(self.EDITS)
     result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
 
     beam.assert_that(result, beam.equal_to(self.EXPECTED))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index 7c420fb..650a886 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -111,8 +111,8 @@ def run(argv=None):
            }
 
   # pylint: disable=expression-not-assigned
-  record_ids = p | beam.Create('CreateIDs', ['1', '2', '3', '4', '5'])
-  records = record_ids | beam.Map('CreateRecords', create_random_record)
+  record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
+  records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
   records | beam.io.Write(
       'write',
       beam.io.BigQuerySink(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index e1d9cf1..1db4a1e 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -105,9 +105,9 @@ def run(argv=None):
                                beam.io.BigQuerySource(query=query_corpus))
   pcoll_word = p | beam.Read('read words',
                              beam.io.BigQuerySource(query=query_word))
-  pcoll_ignore_corpus = p | beam.Create('create_ignore_corpus', [ignore_corpus])
-  pcoll_ignore_word = p | beam.Create('create_ignore_word', [ignore_word])
-  pcoll_group_ids = p | beam.Create('create groups', group_ids)
+  pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus])
+  pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
+  pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
 
   pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
                                pcoll_ignore_corpus, pcoll_ignore_word)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index bc75c41..215aafa 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -29,16 +29,16 @@ class BigQuerySideInputTest(unittest.TestCase):
   def test_create_groups(self):
     p = beam.Pipeline('DirectPipelineRunner')
 
-    group_ids_pcoll = p | beam.Create('create_group_ids', ['A', 'B', 'C'])
+    group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
     corpus_pcoll = p | beam.Create('create_corpus',
                                    [{'f': 'corpus1'},
                                     {'f': 'corpus2'},
                                     {'f': 'corpus3'}])
-    words_pcoll = p | beam.Create('create_words', [{'f': 'word1'},
+    words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
                                                    {'f': 'word2'},
                                                    {'f': 'word3'}])
-    ignore_corpus_pcoll = p | beam.Create('create_ignore_corpus', ['corpus1'])
-    ignore_word_pcoll = p | beam.Create('create_ignore_word', ['word1'])
+    ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
+    ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
 
     groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
                                                words_pcoll, ignore_corpus_pcoll,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index e732309..cdaee36 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -56,8 +56,8 @@ def count_tornadoes(input_data):
           | beam.FlatMap(
               'months with tornadoes',
               lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
-          | beam.CombinePerKey('monthly count', sum)
-          | beam.Map('format', lambda (k, v): {'month': k, 'tornado_count': v}))
+          | 'monthly count' >> beam.CombinePerKey(sum)
+          | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}))
 
 
 def run(argv=None):
@@ -77,7 +77,7 @@ def run(argv=None):
   p = beam.Pipeline(argv=pipeline_args)
 
   # Read the table rows into a PCollection.
-  rows = p | beam.io.Read('read', beam.io.BigQuerySource(known_args.input))
+  rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
   counts = count_tornadoes(rows)
 
   # Write the output using a "Write" transform that has side effects.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
index 2547849..87e1f44 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py
@@ -28,7 +28,7 @@ class BigQueryTornadoesTest(unittest.TestCase):
 
   def test_basics(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    rows = (p | beam.Create('create', [
+    rows = (p | 'create' >> beam.Create([
         {'month': 1, 'day': 1, 'tornado': False},
         {'month': 1, 'day': 2, 'tornado': True},
         {'month': 1, 'day': 3, 'tornado': True},

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index cde00b3..c29a038 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -59,30 +59,30 @@ def run(argv=None):
 
   # Count the occurrences of each word.
   output = (lines
-            | beam.Map('split', lambda x: (x[:10], x[10:99])
+            | 'split' >> beam.Map(lambda x: (x[:10], x[10:99])
                       ).with_output_types(beam.typehints.KV[str, str])
-            | beam.GroupByKey('group')
+            | 'group' >> beam.GroupByKey()
             | beam.FlatMap(
                 'format',
                 lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
 
   # Write the output using a "Write" transform that has side effects.
-  output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
 
   # Optionally write the input and output checksums.
   if known_args.checksum_output:
     input_csum = (lines
-                  | beam.Map('input-csum', crc32line)
-                  | beam.CombineGlobally('combine-input-csum', sum)
-                  | beam.Map('hex-format', lambda x: '%x' % x))
+                  | 'input-csum' >> beam.Map(crc32line)
+                  | 'combine-input-csum' >> beam.CombineGlobally(sum)
+                  | 'hex-format' >> beam.Map(lambda x: '%x' % x))
     input_csum | beam.io.Write(
         'write-input-csum',
         beam.io.TextFileSink(known_args.checksum_output + '-input'))
 
     output_csum = (output
-                   | beam.Map('output-csum', crc32line)
-                   | beam.CombineGlobally('combine-output-csum', sum)
-                   | beam.Map('hex-format-output', lambda x: '%x' % x))
+                   | 'output-csum' >> beam.Map(crc32line)
+                   | 'combine-output-csum' >> beam.CombineGlobally(sum)
+                   | 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
     output_csum | beam.io.Write(
         'write-output-csum',
         beam.io.TextFileSink(known_args.checksum_output + '-output'))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index 1ce1fa5..bbe02b3 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -89,7 +89,7 @@ def run(argv=None):
   (p  # pylint: disable=expression-not-assigned
    | beam.io.Read('read',
                   beam.io.TextFileSource(known_args.input, coder=JsonCoder()))
-   | beam.FlatMap('points', compute_points)
+   | 'points' >> beam.FlatMap(compute_points)
    | beam.CombinePerKey(sum)
    | beam.io.Write('write',
                    beam.io.TextFileSink(known_args.output, coder=JsonCoder())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py
index 5840081..75b78c8 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py
@@ -35,9 +35,9 @@ class CodersTest(unittest.TestCase):
 
   def test_compute_points(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    records = p | beam.Create('create', self.SAMPLE_RECORDS)
+    records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS)
     result = (records
-              | beam.FlatMap('points', coders.compute_points)
+              | 'points' >> beam.FlatMap(coders.compute_points)
               | beam.CombinePerKey(sum))
     assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)]))
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index d3d8b08..021eff6 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -39,7 +39,7 @@ class Count1(beam.PTransform):
   def apply(self, pcoll):
     return (
         pcoll
-        | beam.Map('Init', lambda v: (v, 1))
+        | 'Init' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 
 
@@ -57,7 +57,7 @@ def Count2(pcoll):  # pylint: disable=invalid-name
   """Count as a decorated function."""
   return (
       pcoll
-      | beam.Map('Init', lambda v: (v, 1))
+      | 'Init' >> beam.Map(lambda v: (v, 1))
       | beam.CombinePerKey(sum))
 
 
@@ -84,7 +84,7 @@ def Count3(pcoll, factor=1):  # pylint: disable=invalid-name
   """
   return (
       pcoll
-      | beam.Map('Init', lambda v: (v, factor))
+      | 'Init' >> beam.Map(lambda v: (v, factor))
       | beam.CombinePerKey(sum))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
index 3c0c6f3..603742f 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py
@@ -40,7 +40,7 @@ class CustomCountTest(unittest.TestCase):
 
   def run_pipeline(self, count_implementation, factor=1):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | beam.Create('create', ['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
+    words = p | 'create' >> beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG'])
     result = words | count_implementation
     assert_that(
         result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))]))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index c309941..b19b566 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -59,14 +59,14 @@ def filter_cold_days(input_data, month_filter):
   # Compute the global mean temperature.
   global_mean = AsSingleton(
       fields_of_interest
-      | beam.Map('extract mean', lambda row: row['mean_temp'])
-      | beam.combiners.Mean.Globally('global mean'))
+      | 'extract mean' >> beam.Map(lambda row: row['mean_temp'])
+      | 'global mean' >> beam.combiners.Mean.Globally())
 
   # Filter to the rows representing days in the month of interest
   # in which the mean daily temperature is below the global mean.
   return (
       fields_of_interest
-      | beam.Filter('desired month', lambda row: row['month'] == month_filter)
+      | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
       | beam.Filter('below mean',
                     lambda row, mean: row['mean_temp'] < mean, global_mean))
 
@@ -88,11 +88,11 @@ def run(argv=None):
 
   p = beam.Pipeline(argv=pipeline_args)
 
-  input_data = p | beam.Read('input', beam.io.BigQuerySource(known_args.input))
+  input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input))
 
   # pylint: disable=expression-not-assigned
   (filter_cold_days(input_data, known_args.month_filter)
-   | beam.io.Write('save to BQ', beam.io.BigQuerySink(
+   | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
        known_args.output,
        schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/filters_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py
index cf1ca7e..9e5592f 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py
@@ -36,7 +36,7 @@ class FiltersTest(unittest.TestCase):
 
   def _get_result_for_month(self, month):
     p = beam.Pipeline('DirectPipelineRunner')
-    rows = (p | beam.Create('create', self.input_data))
+    rows = (p | 'create' >> beam.Create(self.input_data))
 
     results = filters.filter_cold_days(rows, month)
     return results

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 140314e..6c86f61 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -98,19 +98,19 @@ def run(argv=sys.argv[1:]):
   coders.registry.register_coder(Player, PlayerCoder)
 
   (p  # pylint: disable=expression-not-assigned
-   | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+   | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
    # The get_players function is annotated with a type hint above, so the type
    # system knows the output type of the following operation is a key-value pair
    # of a Player and an int. Please see the documentation for details on
    # types that are inferred automatically as well as other ways to specify
    # type hints.
-   | beam.Map('get players', get_players)
+   | 'get players' >> beam.Map(get_players)
    # The output type hint of the previous step is used to infer that the key
    # type of the following operation is the Player type. Since a custom coder
    # is registered for the Player class above, a PlayerCoder will be used to
    # encode Player objects as keys for this combine operation.
    | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
-   | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 9e6b001..bf6d1b1 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -88,7 +88,7 @@ def run(argv=None, assert_results=None):
       known_args.input_snailmail))
 
   # Group together all entries under the same name.
-  grouped = (email, phone, snailmail) | beam.CoGroupByKey('group_by_name')
+  grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
 
   # Prepare tab-delimited output; something like this:
   # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
@@ -107,9 +107,9 @@ def run(argv=None, assert_results=None):
   nomads = grouped | beam.Filter(    # People without addresses.
       lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
 
-  num_luddites = luddites | beam.combiners.Count.Globally('luddites')
-  num_writers = writers | beam.combiners.Count.Globally('writers')
-  num_nomads = nomads | beam.combiners.Count.Globally('nomads')
+  num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally()
+  num_writers = writers | 'writers' >> beam.combiners.Count.Globally()
+  num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally()
 
   # Write tab-delimited output.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 5bde591..187d20b 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -114,10 +114,10 @@ class CountWords(beam.PTransform):
 
   def apply(self, pcoll):
     return (pcoll
-            | beam.Map('pair_with_one', lambda x: (x, 1))
-            | beam.GroupByKey('group')
-            | beam.Map('count', lambda (word, ones): (word, sum(ones)))
-            | beam.Map('format', lambda (word, c): '%s: %s' % (word, c)))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)))
 
 
 def run(argv=None):
@@ -137,7 +137,7 @@ def run(argv=None):
   pipeline_options.view_as(SetupOptions).save_main_session = True
   p = beam.Pipeline(options=pipeline_options)
 
-  lines = p | beam.Read('read', beam.io.TextFileSource(known_args.input))
+  lines = p | 'read' >> beam.Read(beam.io.TextFileSource(known_args.input))
 
   # with_outputs allows accessing the side outputs of a DoFn.
   split_lines_result = (lines
@@ -155,21 +155,21 @@ def run(argv=None):
 
   # pylint: disable=expression-not-assigned
   (character_count
-   | beam.Map('pair_with_key', lambda x: ('chars_temp_key', x))
+   | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
    | beam.GroupByKey()
-   | beam.Map('count chars', lambda (_, counts): sum(counts))
+   | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
    | beam.Write('write chars',
                 beam.io.TextFileSink(known_args.output + '-chars')))
 
   # pylint: disable=expression-not-assigned
   (short_words
-   | CountWords('count short words')
+   | 'count short words' >> CountWords()
    | beam.Write('write short words',
                 beam.io.TextFileSink(known_args.output + '-short-words')))
 
   # pylint: disable=expression-not-assigned
   (words
-   | CountWords('count words')
+   | 'count words' >> CountWords()
    | beam.Write('write words',
                 beam.io.TextFileSink(known_args.output + '-words')))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 3658619..c605db8 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -104,7 +104,7 @@ def construct_pipeline(renames):
   # [END pipelines_constructing_applying]
 
   # [START pipelines_constructing_writing]
-  filtered_words = reversed_words | beam.Filter('FilterWords', filter_words)
+  filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
   filtered_words | beam.io.Write('WriteMyFile',
                                  beam.io.TextFileSink(
                                      'gs://some/outputData.txt'))
@@ -242,8 +242,8 @@ def pipeline_options_remote(argv):
   options.view_as(StandardOptions).runner = 'DirectPipelineRunner'
   p = Pipeline(options=options)
 
-  lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input))
-  lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output))
+  lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
+  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
 
   p.run()
 
@@ -283,8 +283,8 @@ def pipeline_options_local(argv):
   p = Pipeline(options=options)
   # [END pipeline_options_local]
 
-  lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(my_input))
-  lines | beam.io.Write('WriteToText', beam.io.TextFileSink(my_output))
+  lines = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(my_input))
+  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(my_output))
   p.run()
 
 
@@ -307,7 +307,7 @@ def pipeline_options_command_line(argv):
   p = beam.Pipeline(argv=pipeline_args)
   lines = p | beam.io.Read('ReadFromText',
                            beam.io.TextFileSource(known_args.input))
-  lines | beam.io.Write('WriteToText', beam.io.TextFileSink(known_args.output))
+  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
   # [END pipeline_options_command_line]
 
   p.run()
@@ -344,8 +344,8 @@ def pipeline_logging(lines, output):
   p = beam.Pipeline(options=PipelineOptions())
   (p
    | beam.Create(lines)
-   | beam.ParDo('ExtractWords', ExtractWordsFn())
-   | beam.io.Write('WriteToText', beam.io.TextFileSink(output)))
+   | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
+   | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(output)))
 
   p.run()
 
@@ -391,11 +391,11 @@ def pipeline_monitoring(renames):
     def apply(self, pcoll):
       return (pcoll
               # Convert lines of text into individual words.
-              | beam.ParDo('ExtractWords', ExtractWordsFn())
+              | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
               # Count the number of times each word occurs.
               | beam.combiners.Count.PerElement()
               # Format each word and count into a printable string.
-              | beam.ParDo('FormatCounts', FormatCountsFn()))
+              | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))
   # [END pipeline_monitoring_composite]
 
   pipeline_options = PipelineOptions()
@@ -405,11 +405,11 @@ def pipeline_monitoring(renames):
   # [START pipeline_monitoring_execution]
   (p
    # Read the lines of the input text.
-   | beam.io.Read('ReadLines', beam.io.TextFileSource(options.input))
+   | 'ReadLines' >> beam.io.Read(beam.io.TextFileSource(options.input))
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
-   | beam.io.Write('WriteCounts', beam.io.TextFileSink(options.output)))
+   | 'WriteCounts' >> beam.io.Write(beam.io.TextFileSink(options.output)))
   # [END pipeline_monitoring_execution]
 
   p.visit(SnippetUtils.RenameFiles(renames))
@@ -454,7 +454,7 @@ def examples_wordcount_minimal(renames):
       # [END examples_wordcount_minimal_read]
 
       # [START examples_wordcount_minimal_pardo]
-      | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+      | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
       # [END examples_wordcount_minimal_pardo]
 
       # [START examples_wordcount_minimal_count]
@@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames):
       # [END examples_wordcount_minimal_map]
 
       # [START examples_wordcount_minimal_write]
-      | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+      | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
       # [END examples_wordcount_minimal_write]
   )
 
@@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames):
   formatted = counts | beam.ParDo(FormatAsTextFn())
   # [END examples_wordcount_wordcount_dofn]
 
-  formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+  formatted | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
   p.visit(SnippetUtils.RenameFiles(renames))
   p.run()
 
@@ -591,9 +591,9 @@ def examples_wordcount_debugging(renames):
       p
       | beam.io.Read(beam.io.TextFileSource(
           'gs://dataflow-samples/shakespeare/kinglear.txt'))
-      | beam.FlatMap('ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+      | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
       | beam.combiners.Count.PerElement()
-      | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+      | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
 
   # [START example_wordcount_debugging_assert]
   beam.assert_that(
@@ -601,7 +601,7 @@ def examples_wordcount_debugging(renames):
   # [END example_wordcount_debugging_assert]
 
   output = (filtered_words
-            | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
             | beam.io.Write(
                 'write', beam.io.TextFileSink('gs://my-bucket/counts.txt')))
 
@@ -682,7 +682,7 @@ def model_custom_source(count):
   # Using the source in an example pipeline.
   # [START model_custom_source_use_new_source]
   p = beam.Pipeline(options=PipelineOptions())
-  numbers = p | beam.io.Read('ProduceNumbers', CountingSource(count))
+  numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))
   # [END model_custom_source_use_new_source]
 
   lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
@@ -712,7 +712,7 @@ def model_custom_source(count):
 
   # [START model_custom_source_use_ptransform]
   p = beam.Pipeline(options=PipelineOptions())
-  numbers = p | ReadFromCountingSource('ProduceNumbers', count)
+  numbers = p | 'ProduceNumbers' >> ReadFromCountingSource(count)
   # [END model_custom_source_use_ptransform]
 
   lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
@@ -848,7 +848,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
 
   # [START model_custom_sink_use_ptransform]
   p = beam.Pipeline(options=PipelineOptions())
-  kvs = p | beam.core.Create('CreateKVs', KVs)
+  kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
   kvs | WriteToKVSink('WriteToSimpleKV',
                       'http://url_to_simple_kv/', final_table_name)
   # [END model_custom_sink_use_ptransform]
@@ -880,7 +880,7 @@ def model_textio(renames):
   # [END model_textio_read]
 
   # [START model_textio_write]
-  filtered_words = lines | beam.FlatMap('FilterWords', filter_words)
+  filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
   # [START model_pipelineio_write]
   filtered_words | beam.io.Write(
       'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers',
@@ -1053,7 +1053,7 @@ def model_group_by_key(contents, output_path):
       p
       | beam.Create(contents)
       | beam.FlatMap(lambda x: re.findall(r'\w+', x))
-      | beam.Map('one word', lambda w: (w, 1)))
+      | 'one word' >> beam.Map(lambda w: (w, 1)))
   # GroupByKey accepts a PCollection of (w, 1) and
   # outputs a PCollection of (w, (1, 1, ...)).
   # (A key/value pair is just a tuple in Python.)
@@ -1063,7 +1063,7 @@ def model_group_by_key(contents, output_path):
   grouped_words = words_and_counts | beam.GroupByKey()
   # [END model_group_by_key_transform]
   (grouped_words
-   | beam.Map('count words', lambda (word, counts): (word, len(counts)))
+   | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
    | beam.io.Write(beam.io.TextFileSink(output_path)))
   p.run()
 
@@ -1083,8 +1083,8 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
   # multiple possible values for each key.
   # The phone_list contains values such as: ('mary': '111-222-3333') with
   # multiple possible values for each key.
-  emails = p | beam.Create('email', email_list)
-  phones = p | beam.Create('phone', phone_list)
+  emails = p | 'email' >> beam.Create(email_list)
+  phones = p | 'phone' >> beam.Create(phone_list)
   # The result PCollection contains one key-value element for each key in the
   # input PCollections. The key of the pair will be the key from the input and
   # the value will be a dictionary with two entries: 'emails' - an iterable of
@@ -1119,9 +1119,9 @@ def model_join_using_side_inputs(
   # This code performs a join by receiving the set of names as an input and
   # passing PCollections that contain emails and phone numbers as side inputs
   # instead of using CoGroupByKey.
-  names = p | beam.Create('names', name_list)
-  emails = p | beam.Create('email', email_list)
-  phones = p | beam.Create('phone', phone_list)
+  names = p | 'names' >> beam.Create(name_list)
+  emails = p | 'email' >> beam.Create(email_list)
+  phones = p | 'phone' >> beam.Create(phone_list)
 
   def join_info(name, emails, phone_numbers):
     filtered_emails = []
@@ -1149,7 +1149,7 @@ def model_join_using_side_inputs(
 class Keys(beam.PTransform):
 
   def apply(self, pcoll):
-    return pcoll | beam.Map('Keys', lambda (k, v): k)
+    return pcoll | 'Keys' >> beam.Map(lambda (k, v): k)
 # [END model_library_transforms_keys]
 # pylint: enable=invalid-name
 
@@ -1160,6 +1160,6 @@ class Count(beam.PTransform):
   def apply(self, pcoll):
     return (
         pcoll
-        | beam.Map('Init', lambda v: (v, 1))
+        | 'Init' >> beam.Map(lambda v: (v, 1))
         | beam.CombinePerKey(sum))
 # [END model_library_transforms_count]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 7888263..9eba46a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -103,14 +103,14 @@ class ParDoTest(unittest.TestCase):
   def test_pardo_with_label(self):
     words = ['aa', 'bbc', 'defg']
     # [START model_pardo_with_label]
-    result = words | beam.Map('CountUniqueLetters', lambda word: len(set(word)))
+    result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
     # [END model_pardo_with_label]
 
     self.assertEqual({1, 2, 4}, set(result))
 
   def test_pardo_side_input(self):
     p = beam.Pipeline('DirectPipelineRunner')
-    words = p | beam.Create('start', ['a', 'bb', 'ccc', 'dddd'])
+    words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd'])
 
     # [START model_pardo_side_input]
     # Callable takes additional arguments.
@@ -124,11 +124,11 @@ class ParDoTest(unittest.TestCase):
                     | beam.CombineGlobally(beam.combiners.MeanCombineFn()))
 
     # Call with explicit side inputs.
-    small_words = words | beam.FlatMap('small', filter_using_length, 0, 3)
+    small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
 
     # A single deferred side input.
     larger_than_average = (words
-                           | beam.FlatMap('large', filter_using_length,
+                           | 'large' >> beam.FlatMap(filter_using_length,
                                           lower_bound=pvalue.AsSingleton(
                                               avg_word_len)))
 
@@ -268,7 +268,7 @@ class TypeHintsTest(unittest.TestCase):
       evens = numbers | beam.ParDo(FilterEvensDoFn())
       # [END type_hints_do_fn]
 
-    words = p | beam.Create('words', ['a', 'bb', 'c'])
+    words = p | 'words' >> beam.Create(['a', 'bb', 'c'])
     # One can assert outputs and apply them to transforms as well.
     # Helps document the contract and checks it at pipeline construction time.
     # [START type_hints_transform]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcap.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py
index 7148e58..ef95a5f 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcap.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcap.py
@@ -49,7 +49,7 @@ def run(argv=None):
 
   # Capitalize the characters in each line.
   transformed = (lines
-                 | (beam.Map('capitalize', lambda x: x.upper())))
+                 | 'capitalize' >> (beam.Map(lambda x: x.upper())))
 
   # Write to PubSub.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index eda74dd..35c1abb 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -55,11 +55,11 @@ def run(argv=None):
                  | (beam.FlatMap('split',
                                  lambda x: re.findall(r'[A-Za-z\']+', x))
                     .with_output_types(unicode))
-                 | beam.Map('pair_with_one', lambda x: (x, 1))
+                 | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
                  | beam.WindowInto(window.FixedWindows(15, 0))
-                 | beam.GroupByKey('group')
-                 | beam.Map('count', lambda (word, ones): (word, sum(ones)))
-                 | beam.Map('format', lambda tup: '%s: %d' % tup))
+                 | 'group' >> beam.GroupByKey()
+                 | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+                 | 'format' >> beam.Map(lambda tup: '%s: %d' % tup))
 
   # Write to PubSub.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index bbfd43e..4744352 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -77,22 +77,22 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+  lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
 
   # Count the occurrences of each word.
   counts = (lines
-            | (beam.ParDo('split', WordExtractingDoFn())
+            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                .with_output_types(unicode))
-            | beam.Map('pair_with_one', lambda x: (x, 1))
-            | beam.GroupByKey('group')
-            | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
 
   # Format the counts into a PCollection of strings.
-  output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
 
   # Actually run the pipeline (all operations above are deferred).
   result = p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 74effed..e008b48 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -97,11 +97,11 @@ class CountWords(beam.PTransform):
 
   def apply(self, pcoll):
     return (pcoll
-            | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+            | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                .with_output_types(unicode))
-            | beam.Map('pair_with_one', lambda x: (x, 1))
-            | beam.GroupByKey('group')
-            | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
 
 
 def run(argv=None):
@@ -126,9 +126,9 @@ def run(argv=None):
   # Read the text file[pattern] into a PCollection, count the occurrences of
   # each word and filter by a list of words.
   filtered_words = (
-      p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+      p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
       | CountWords()
-      | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
+      | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
 
   # assert_that is a convenient PTransform that checks a PCollection has an
   # expected value. Asserts are best used in unit tests with small data sets but
@@ -145,8 +145,8 @@ def run(argv=None):
   # "Write" transform that has side effects.
   # pylint: disable=unused-variable
   output = (filtered_words
-            | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
-            | beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
+            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+            | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index c3c41d7..ce5b644 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -93,22 +93,22 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+  lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
 
   # Count the occurrences of each word.
   counts = (lines
-            | (beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+            | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                .with_output_types(unicode))
-            | beam.Map('pair_with_one', lambda x: (x, 1))
-            | beam.GroupByKey('group')
-            | beam.Map('count', lambda (word, ones): (word, sum(ones))))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
 
   # Format the counts into a PCollection of strings.
-  output = counts | beam.Map('format', lambda (word, c): '%s: %s' % (word, c))
+  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
+  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 4d1b245..7ad3842 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -43,7 +43,7 @@ class ReadFromAvro(PTransform):
     files, a ``PCollection`` for the records in these Avro files can be created
     in the following manner.
       p = df.Pipeline(argv=pipeline_args)
-      records = p | df.io.ReadFromAvro('Read', '/mypath/myavrofiles*')
+      records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*')
 
     Each record of this ``PCollection`` will contain a single record read from a
     source. Records that are of simple types will be mapped into corresponding

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f2c56dc..f789312 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows
 of the side table. The execution framework may use some caching techniques to
 share the side inputs between calls in order to avoid excessive reading::
 
-  main_table = pipeline | beam.io.Read(beam.io.BigQuerySource('very_big_table')
-  side_table = pipeline | beam.io.Read(beam.io.BigQuerySource('not_big_table')
+  main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource()
+  side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource()
   results = (
       main_table
       | beam.Map('process data',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index c7837ec..1bf51b2 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -255,7 +255,7 @@ class TestFileBasedSource(unittest.TestCase):
     file_name, expected_data = _write_data(100)
     assert len(expected_data) == 100
     pipeline = beam.Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Read('Read', LineSource(file_name))
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(file_name))
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 
@@ -263,7 +263,7 @@ class TestFileBasedSource(unittest.TestCase):
     pattern, expected_data = _write_pattern([34, 66, 40, 24, 24, 12])
     assert len(expected_data) == 200
     pipeline = beam.Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Read('Read', LineSource(pattern))
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(pattern))
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index de5e9d4..b683eb2 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -990,7 +990,7 @@ class Write(ptransform.PTransform):
     from apache_beam.io import iobase
     if isinstance(self.sink, iobase.NativeSink):
       # A native sink
-      return pcoll | _NativeWrite('native_write', self.sink)
+      return pcoll | 'native_write' >> _NativeWrite(self.sink)
     elif isinstance(self.sink, iobase.Sink):
       # A custom sink
       return pcoll | WriteImpl(self.sink)
@@ -1010,7 +1010,7 @@ class WriteImpl(ptransform.PTransform):
     self.sink = sink
 
   def apply(self, pcoll):
-    do_once = pcoll.pipeline | core.Create('DoOnce', [None])
+    do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
     init_result_coll = do_once | core.Map(
         'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)
     if getattr(self.sink, 'num_shards', 0):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 86ae45f..39816c0 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -69,7 +69,7 @@ class PipelineTest(unittest.TestCase):
 
   @staticmethod
   def custom_callable(pcoll):
-    return pcoll | FlatMap('+1', lambda x: [x + 1])
+    return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
 
   # Some of these tests designate a runner by name, others supply a runner.
   # This variation is just to verify that both means of runner specification
@@ -78,7 +78,7 @@ class PipelineTest(unittest.TestCase):
   class CustomTransform(PTransform):
 
     def apply(self, pcoll):
-      return pcoll | FlatMap('+1', lambda x: [x + 1])
+      return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
 
   class Visitor(PipelineVisitor):
 
@@ -98,33 +98,33 @@ class PipelineTest(unittest.TestCase):
 
   def test_create(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Create('label1', [1, 2, 3])
+    pcoll = pipeline | 'label1' >> Create([1, 2, 3])
     assert_that(pcoll, equal_to([1, 2, 3]))
 
     # Test if initial value is an iterator object.
-    pcoll2 = pipeline | Create('label2', iter((4, 5, 6)))
-    pcoll3 = pcoll2 | FlatMap('do', lambda x: [x + 10])
+    pcoll2 = pipeline | 'label2' >> Create(iter((4, 5, 6)))
+    pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
     assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')
     pipeline.run()
 
   def test_create_singleton_pcollection(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Create('label', [[1, 2, 3]])
+    pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
     assert_that(pcoll, equal_to([[1, 2, 3]]))
     pipeline.run()
 
   def test_read(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Read('read', FakeSource([1, 2, 3]))
+    pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
     assert_that(pcoll, equal_to([1, 2, 3]))
     pipeline.run()
 
   def test_visit_entire_graph(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll1 = pipeline | Create('pcoll', [1, 2, 3])
-    pcoll2 = pcoll1 | FlatMap('do1', lambda x: [x + 1])
-    pcoll3 = pcoll2 | FlatMap('do2', lambda x: [x + 1])
-    pcoll4 = pcoll2 | FlatMap('do3', lambda x: [x + 1])
+    pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
+    pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
+    pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
+    pcoll4 = pcoll2 | 'do3' >> FlatMap(lambda x: [x + 1])
     transform = PipelineTest.CustomTransform()
     pcoll5 = pcoll4 | transform
 
@@ -140,15 +140,15 @@ class PipelineTest(unittest.TestCase):
 
   def test_apply_custom_transform(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll = pipeline | Create('pcoll', [1, 2, 3])
+    pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
     result = pcoll | PipelineTest.CustomTransform()
     assert_that(result, equal_to([2, 3, 4]))
     pipeline.run()
 
   def test_reuse_custom_transform_instance(self):
     pipeline = Pipeline(self.runner_name)
-    pcoll1 = pipeline | Create('pcoll1', [1, 2, 3])
-    pcoll2 = pipeline | Create('pcoll2', [4, 5, 6])
+    pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3])
+    pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6])
     transform = PipelineTest.CustomTransform()
     pcoll1 | transform
     with self.assertRaises(RuntimeError) as cm:
@@ -183,7 +183,7 @@ class PipelineTest(unittest.TestCase):
 
     self.assertEqual(
         ['a-x', 'b-x', 'c-x'],
-        sorted(['a', 'b', 'c'] | AddSuffix('-x')))
+        sorted(['a', 'b', 'c'] | '-x' >> AddSuffix()))
 
   def test_cached_pvalues_are_refcounted(self):
     """Test that cached PValues are refcounted and deleted.
@@ -213,17 +213,17 @@ class PipelineTest(unittest.TestCase):
 
     gc.collect()
     count_threshold = len(gc.get_objects()) + 10000
-    biglist = pipeline | Create('oom:create', ['x'] * num_elements)
+    biglist = pipeline | 'oom:create' >> Create(['x'] * num_elements)
     dupes = (
         biglist
-        | Map('oom:addone', lambda x: (x, 1))
-        | FlatMap('oom:dupes', create_dupes,
+        | 'oom:addone' >> Map(lambda x: (x, 1))
+        | 'oom:dupes' >> FlatMap(create_dupes,
                   AsIter(biglist)).with_outputs('side', main='main'))
     result = (
         (dupes.side, dupes.main, dupes.side)
-        | Flatten('oom:flatten')
-        | CombinePerKey('oom:combine', sum)
-        | Map('oom:check', check_memory, count_threshold))
+        | 'oom:flatten' >> Flatten()
+        | 'oom:combine' >> CombinePerKey(sum)
+        | 'oom:check' >> Map(check_memory, count_threshold))
 
     assert_that(result, equal_to([('x', 3 * num_elements)]))
     pipeline.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index bb742e0..323ca33 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -45,9 +45,9 @@ class PValueTest(unittest.TestCase):
 
   def test_pcollectionview_not_recreated(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    value = pipeline | Create('create1', [1, 2, 3])
-    value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)])
-    value3 = pipeline | Create('create3', [(1, 1), (2, 2), (3, 3)])
+    value = pipeline | 'create1' >> Create([1, 2, 3])
+    value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
+    value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])
     self.assertEqual(AsSingleton(value), AsSingleton(value))
     self.assertEqual(AsSingleton('new', value, default_value=1),
                      AsSingleton('new', value, default_value=1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
index b3b2968..3cd8d73 100644
--- a/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/inprocess/consumer_tracking_pipeline_visitor_test.py
@@ -103,8 +103,8 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
                                pvalue.ListPCollectionView))
 
   def test_co_group_by_key(self):
-    emails = self.pipeline | Create('email', [('joe', 'joe@example.com')])
-    phones = self.pipeline | Create('phone', [('mary', '111-222-3333')])
+    emails = self.pipeline | 'email' >> Create([('joe', 'joe@example.com')])
+    phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')])
     {'emails': emails, 'phones': phones} | CoGroupByKey()
 
     self.pipeline.visit(self.visitor)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 2863756..04de7fb 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -60,9 +60,9 @@ class RunnerTest(unittest.TestCase):
                      '--no_auth=True'
                  ]))
 
-    (p | ptransform.Create('create', [1, 2, 3])  # pylint: disable=expression-not-assigned
-     | ptransform.FlatMap('do', lambda x: [(x, x)])
-     | ptransform.GroupByKey('gbk'))
+    (p | 'create' >> ptransform.Create([1, 2, 3])  # pylint: disable=expression-not-assigned
+     | 'do' >> ptransform.FlatMap(lambda x: [(x, x)])
+     | 'gbk' >> ptransform.GroupByKey())
     remote_runner.job = apiclient.Job(p.options)
     super(DataflowPipelineRunner, remote_runner).run(p)
 


[08/12] incubator-beam git commit: Fixes examples

Posted by ro...@apache.org.
Fixes examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ff5630b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ff5630b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ff5630b

Branch: refs/heads/python-sdk
Commit: 2ff5630be06034e25d60e105bbe1a718ae06b4d6
Parents: 362f2e9
Author: Chamikara Jayalath <ch...@google.com>
Authored: Fri Jul 22 16:04:29 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/complete/autocomplete.py | 4 ++--
 sdks/python/apache_beam/examples/complete/estimate_pi.py  | 3 ---
 sdks/python/apache_beam/examples/snippets/snippets.py     | 8 ++++----
 3 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index b68bc56..10d9009 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -56,8 +56,8 @@ def run(argv=None):
 
 class TopPerPrefix(beam.PTransform):
 
-  def __init__(self, label, count):
-    super(TopPerPrefix, self).__init__(label)
+  def __init__(self, count):
+    super(TopPerPrefix, self).__init__()
     self._count = count
 
   def apply(self, words):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index ef9f8cc..c33db1d 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -90,9 +90,6 @@ class JsonCoder(object):
 class EstimatePiTransform(beam.PTransform):
   """Runs 10M trials, and combine the results to estimate pi."""
 
-  def __init__(self, label):
-    super(EstimatePiTransform, self).__init__(label)
-
   def apply(self, pcoll):
     # A hundred work items of a hundred thousand tries each.
     return (pcoll

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index c605db8..9d1df82 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames):
       # [END examples_wordcount_minimal_map]
 
       # [START examples_wordcount_minimal_write]
-      | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
+      | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
       # [END examples_wordcount_minimal_write]
   )
 
@@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames):
   formatted = counts | beam.ParDo(FormatAsTextFn())
   # [END examples_wordcount_wordcount_dofn]
 
-  formatted | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
+  formatted |  beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
   p.visit(SnippetUtils.RenameFiles(renames))
   p.run()
 
@@ -702,8 +702,8 @@ def model_custom_source(count):
   # [START model_custom_source_new_ptransform]
   class ReadFromCountingSource(PTransform):
 
-    def __init__(self, label, count, **kwargs):
-      super(ReadFromCountingSource, self).__init__(label, **kwargs)
+    def __init__(self, count, **kwargs):
+      super(ReadFromCountingSource, self).__init__(**kwargs)
       self._count = count
 
     def apply(self, pcoll):


[06/12] incubator-beam git commit: fix pipeline test

Posted by ro...@apache.org.
fix pipeline test


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/362f2e9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/362f2e9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/362f2e9e

Branch: refs/heads/python-sdk
Commit: 362f2e9e4398662d55a2a4e6399f43155c58ed24
Parents: 0183051
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jul 22 16:03:08 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline_test.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/362f2e9e/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 39816c0..327f26c 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -183,7 +183,7 @@ class PipelineTest(unittest.TestCase):
 
     self.assertEqual(
         ['a-x', 'b-x', 'c-x'],
-        sorted(['a', 'b', 'c'] | '-x' >> AddSuffix()))
+        sorted(['a', 'b', 'c'] | 'AddSuffix' >> AddSuffix('-x')))
 
   def test_cached_pvalues_are_refcounted(self):
     """Test that cached PValues are refcounted and deleted.
@@ -218,7 +218,7 @@ class PipelineTest(unittest.TestCase):
         biglist
         | 'oom:addone' >> Map(lambda x: (x, 1))
         | 'oom:dupes' >> FlatMap(create_dupes,
-                  AsIter(biglist)).with_outputs('side', main='main'))
+            AsIter(biglist)).with_outputs('side', main='main'))
     result = (
         (dupes.side, dupes.main, dupes.side)
         | 'oom:flatten' >> Flatten()
@@ -232,8 +232,8 @@ class PipelineTest(unittest.TestCase):
         {
             'oom:flatten': 3 * num_elements,
             ('oom:combine/GroupByKey/reify_windows', None): 3 * num_elements,
-            ('oom:dupes/oom:dupes', 'side'): num_elements,
-            ('oom:dupes/oom:dupes', None): num_elements,
+            ('oom:dupes/FlatMap(create_dupes)', 'side'): num_elements,
+            ('oom:dupes/FlatMap(create_dupes)', None): num_elements,
             'oom:create': num_elements,
             ('oom:addone', None): num_elements,
             'oom:combine/GroupByKey/group_by_key': 1,


[09/12] incubator-beam git commit: Fix label-sensitive test.

Posted by ro...@apache.org.
Fix label-sensitive test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a59a121
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a59a121
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a59a121

Branch: refs/heads/python-sdk
Commit: 2a59a121441a003bad949ae6a23d58a9cf2b3059
Parents: 2ff5630
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 16:24:48 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py                   |  4 ++++
 sdks/python/apache_beam/transforms/ptransform_test.py | 14 +++++++-------
 2 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a59a121/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index aeed9f9..0572466 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -348,6 +348,10 @@ class AppliedPTransform(object):
     # root producer.
     self.refcounts = collections.defaultdict(int)
 
+  def __repr__(self):
+    return "%s(%s, %s)" % (self.__class__.__name__, self.full_label,
+                           type(self.transform).__name__)
+
   def update_input_refcounts(self):
     """Increment refcounts for all transforms providing inputs."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a59a121/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8121c1e..3a71ec3 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -54,28 +54,28 @@ class PTransformTest(unittest.TestCase):
 
     pa = Pipeline('DirectPipelineRunner')
     res = pa | 'a_label' >> beam.Create([1, 2])
-    self.assertEqual('<Create(PTransform) label=[a_label]>',
-                     str(res.producer.transform))
+    self.assertEqual('AppliedPTransform(a_label, Create)',
+                     str(res.producer))
 
     pc = Pipeline('DirectPipelineRunner')
-    res = pc | 'with_inputs' >> beam.Create([1, 2])
+    res = pc | beam.Create([1, 2])
     inputs_tr = res.producer.transform
     inputs_tr.inputs = ('ci',)
     self.assertEqual(
-        """<Create(PTransform) label=[with_inputs] inputs=('ci',)>""",
+        """<Create(PTransform) label=[Create] inputs=('ci',)>""",
         str(inputs_tr))
 
     pd = Pipeline('DirectPipelineRunner')
-    res = pd | 'with_sidei' >> beam.Create([1, 2])
+    res = pd | beam.Create([1, 2])
     side_tr = res.producer.transform
     side_tr.side_inputs = (4,)
     self.assertEqual(
-        '<Create(PTransform) label=[with_sidei] side_inputs=(4,)>',
+        '<Create(PTransform) label=[Create] side_inputs=(4,)>',
         str(side_tr))
 
     inputs_tr.side_inputs = ('cs',)
     self.assertEqual(
-        """<Create(PTransform) label=[with_inputs] """
+        """<Create(PTransform) label=[Create] """
         """inputs=('ci',) side_inputs=('cs',)>""",
         str(inputs_tr))
 


[10/12] incubator-beam git commit: Lint fixes.

Posted by ro...@apache.org.
Lint fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15d35ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15d35ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15d35ca

Branch: refs/heads/python-sdk
Commit: b15d35ca6e585e75153e05d96403336889cc6894
Parents: 2a59a12
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 18:35:22 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/dataflow_test.py        |   7 +-
 .../complete/juliaset/juliaset/juliaset.py      |   9 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../examples/cookbook/bigquery_side_input.py    |  14 +-
 .../cookbook/bigquery_side_input_test.py        |   4 +-
 .../examples/cookbook/bigquery_tornadoes.py     |   6 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |   5 +-
 .../apache_beam/examples/cookbook/filters.py    |   2 +-
 .../apache_beam/examples/snippets/snippets.py   |   2 +-
 .../examples/snippets/snippets_test.py          |   8 +-
 sdks/python/apache_beam/examples/wordcount.py   |   2 +-
 .../apache_beam/examples/wordcount_debugging.py |   2 +-
 .../apache_beam/examples/wordcount_minimal.py   |   2 +-
 sdks/python/apache_beam/io/bigquery.py          |   4 +-
 sdks/python/apache_beam/pipeline_test.py        |   4 +-
 .../apache_beam/transforms/combiners_test.py    |   4 +-
 sdks/python/apache_beam/transforms/core.py      |   7 +-
 .../apache_beam/transforms/ptransform_test.py   | 161 ++++++++++---------
 sdks/python/apache_beam/transforms/util.py      |   3 +-
 .../typehints/typed_pipeline_test.py            |   2 +-
 20 files changed, 127 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
index bf66851..cc3a526 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -114,8 +114,8 @@ class DataflowTest(unittest.TestCase):
     words = pipeline | 'SomeWords' >> Create(words_list)
     prefix = 'zyx'
     suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
-    result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
-                           suffix=AsSingleton(suffix))
+    result = words | 'DecorateWordsDoFn' >> ParDo(
+        SomeDoFn(), prefix, suffix=AsSingleton(suffix))
     assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
     pipeline.run()
 
@@ -179,8 +179,7 @@ class DataflowTest(unittest.TestCase):
     pipeline = Pipeline('DirectPipelineRunner')
     pcol = pipeline | 'start' >> Create([1, 2])
     side = pipeline | 'side' >> Create([])  # 0 values in side input.
-    result = (
-        pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)))
+    result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
     assert_that(result, equal_to([10, 20]))
     pipeline.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 56696c3..1445fbe 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,11 +105,12 @@ def run(argv=None):  # pylint: disable=missing-docstring
   # Group each coordinate triplet by its x value, then write the coordinates to
   # the output file with an x-coordinate grouping per line.
   # pylint: disable=expression-not-assigned
-  (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
-   | 'x coord' >> beam.GroupByKey() | beam.Map(
-       'format',
+  (coordinates
+   | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+   | 'x coord' >> beam.GroupByKey()
+   | 'format' >> beam.Map(
        lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
-   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
+   | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
   # pylint: enable=expression-not-assigned
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index ee7e534..f30b832 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase):
     result = (
         uri_to_line
         | tfidf.TfIdf()
-        | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+        | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
     beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS))
     # Run the pipeline. Note that the assert_that above adds to the pipeline
     # a check that the result PCollection contains expected values. To actually

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 1db4a1e..2099e48 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -101,11 +101,12 @@ def run(argv=None):
   ignore_corpus = known_args.ignore_corpus
   ignore_word = known_args.ignore_word
 
-  pcoll_corpus = p | beam.Read('read corpus',
-                               beam.io.BigQuerySource(query=query_corpus))
-  pcoll_word = p | beam.Read('read words',
-                             beam.io.BigQuerySource(query=query_word))
-  pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus])
+  pcoll_corpus = p | 'read corpus' >> beam.io.Read(
+      beam.io.BigQuerySource(query=query_corpus))
+  pcoll_word = p | 'read_words' >> beam.Read(
+      beam.io.BigQuerySource(query=query_word))
+  pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create(
+      [ignore_corpus])
   pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word])
   pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids)
 
@@ -113,8 +114,7 @@ def run(argv=None):
                                pcoll_ignore_corpus, pcoll_ignore_word)
 
   # pylint:disable=expression-not-assigned
-  pcoll_groups | beam.io.Write('WriteToText',
-                               beam.io.TextFileSink(known_args.output))
+  pcoll_groups | beam.io.Write(beam.io.TextFileSink(known_args.output))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 215aafa..e2b20f3 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -35,8 +35,8 @@ class BigQuerySideInputTest(unittest.TestCase):
                                     {'f': 'corpus2'},
                                     {'f': 'corpus3'}])
     words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
-                                                   {'f': 'word2'},
-                                                   {'f': 'word3'}])
+                                                     {'f': 'word2'},
+                                                     {'f': 'word3'}])
     ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
     ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index cdaee36..6e1326c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -53,11 +53,11 @@ def count_tornadoes(input_data):
   """
 
   return (input_data
-          | beam.FlatMap(
-              'months with tornadoes',
+          | 'months with tornatoes' >> beam.FlatMap(
               lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
           | 'monthly count' >> beam.CombinePerKey(sum)
-          | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}))
+          | 'format' >> beam.Map(
+              lambda (k, v): {'month': k, 'tornado_count': v}))
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index c29a038..f7070dc 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -59,8 +59,9 @@ def run(argv=None):
 
   # Count the occurrences of each word.
   output = (lines
-            | 'split' >> beam.Map(lambda x: (x[:10], x[10:99])
-                      ).with_output_types(beam.typehints.KV[str, str])
+            | 'split' >> beam.Map(
+                lambda x: (x[:10], x[10:99]))
+            .with_output_types(beam.typehints.KV[str, str])
             | 'group' >> beam.GroupByKey()
             | beam.FlatMap(
                 'format',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index b19b566..efd0ba7 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -88,7 +88,7 @@ def run(argv=None):
 
   p = beam.Pipeline(argv=pipeline_args)
 
-  input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input))
+  input_data = p | beam.Read(beam.io.BigQuerySource(known_args.input))
 
   # pylint: disable=expression-not-assigned
   (filter_cold_days(input_data, known_args.month_filter)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 9d1df82..9f3d6e1 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -307,7 +307,7 @@ def pipeline_options_command_line(argv):
   p = beam.Pipeline(argv=pipeline_args)
   lines = p | beam.io.Read('ReadFromText',
                            beam.io.TextFileSource(known_args.input))
-  lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+  lines | beam.io.Write(beam.io.TextFileSink(known_args.output))
   # [END pipeline_options_command_line]
 
   p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 9eba46a..edc0a17 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -101,6 +101,7 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({'A', 'C'}, set(all_capitals))
 
   def test_pardo_with_label(self):
+    # pylint: disable=line-too-long
     words = ['aa', 'bbc', 'defg']
     # [START model_pardo_with_label]
     result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
@@ -127,10 +128,9 @@ class ParDoTest(unittest.TestCase):
     small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
 
     # A single deferred side input.
-    larger_than_average = (words
-                           | 'large' >> beam.FlatMap(filter_using_length,
-                                          lower_bound=pvalue.AsSingleton(
-                                              avg_word_len)))
+    larger_than_average = (words | 'large' >> beam.FlatMap(
+        filter_using_length,
+        lower_bound=pvalue.AsSingleton(avg_word_len)))
 
     # Mix and match.
     small_but_nontrivial = words | beam.FlatMap(filter_using_length,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 4744352..096e508 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -82,7 +82,7 @@ def run(argv=None):
   # Count the occurrences of each word.
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
-               .with_output_types(unicode))
+                          .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
             | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index e008b48..473a486 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -98,7 +98,7 @@ class CountWords(beam.PTransform):
   def apply(self, pcoll):
     return (pcoll
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-               .with_output_types(unicode))
+                          .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
             | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index ce5b644..4073f7b 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -98,7 +98,7 @@ def run(argv=None):
   # Count the occurrences of each word.
   counts = (lines
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-               .with_output_types(unicode))
+                          .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
             | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index f789312..50c2eaf 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows
 of the side table. The execution framework may use some caching techniques to
 share the side inputs between calls in order to avoid excessive reading::
 
-  main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource()
-  side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource()
+  main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource()
+  side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource()
   results = (
       main_table
       | beam.Map('process data',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 327f26c..8a0d246 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -217,8 +217,8 @@ class PipelineTest(unittest.TestCase):
     dupes = (
         biglist
         | 'oom:addone' >> Map(lambda x: (x, 1))
-        | 'oom:dupes' >> FlatMap(create_dupes,
-            AsIter(biglist)).with_outputs('side', main='main'))
+        | 'oom:dupes' >> FlatMap(
+            create_dupes, AsIter(biglist)).with_outputs('side', main='main'))
     result = (
         (dupes.side, dupes.main, dupes.side)
         | 'oom:flatten' >> Flatten()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index c970382..bfe168d 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -219,8 +219,8 @@ class CombineTest(unittest.TestCase):
         return main | Map(lambda _, s: s, side)
 
     p = Pipeline('DirectPipelineRunner')
-    result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput()
-    result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> CombineWithSideInput()
+    result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput()
+    result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput()
     assert_that(result1, equal_to([0]), label='r1')
     assert_that(result2, equal_to([10]), label='r2')
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 44a6d29..5e6aafc 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -814,9 +814,9 @@ class CombineGlobally(PTransform):
         return transform
 
     combined = (pcoll
-                | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v))
-                                  .with_output_types(
-                                      KV[None, pcoll.element_type]))
+                | 'KeyWithVoid' >> add_input_types(
+                    Map(lambda v: (None, v)).with_output_types(
+                        KV[None, pcoll.element_type]))
                 | CombinePerKey(
                     'CombinePerKey', self.fn, *self.args, **self.kwargs)
                 | 'UnKey' >> Map(lambda (k, v): v))
@@ -1044,6 +1044,7 @@ class GroupByKey(PTransform):
           KV[key_type, Iterable[typehints.WindowedValue[value_type]]])
       gbk_output_type = KV[key_type, Iterable[value_type]]
 
+      # pylint: disable=bad-continuation
       return (pcoll
               | 'reify_windows' >> (ParDo(self.ReifyWindows())
                  .with_output_types(reify_output_type))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 3a71ec3..992f944 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -288,7 +288,7 @@ class PTransformTest(unittest.TestCase):
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
-                                             [('b', x) for x in vals_2]))
+                                               [('b', x) for x in vals_2]))
     result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
     assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
                                   ('b', sum(vals_2) / len(vals_2))]))
@@ -299,7 +299,7 @@ class PTransformTest(unittest.TestCase):
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
-                                             [('b', x) for x in vals_2]))
+                                               [('b', x) for x in vals_2]))
     result = pcoll | beam.CombinePerKey(sum)
     assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
     pipeline.run()
@@ -309,7 +309,7 @@ class PTransformTest(unittest.TestCase):
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
-                                             [('b', x) for x in vals_2]))
+                                               [('b', x) for x in vals_2]))
     divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombinePerKey(
         lambda vals, d: max(v for v in vals if v % d == 0),
@@ -513,13 +513,14 @@ class PTransformTest(unittest.TestCase):
   def test_apply_to_list(self):
     self.assertItemsEqual(
         [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
-    self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
+    self.assertItemsEqual([1],
+                          [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
     self.assertItemsEqual([1, 2, 100, 3],
-                          ([1, 2, 3], [100]) | 'flat' >> beam.Flatten())
+                          ([1, 2, 3], [100]) | beam.Flatten())
     join_input = ([('k', 'a')],
                   [('k', 'b'), ('k', 'c')])
     self.assertItemsEqual([('k', (['a'], ['b', 'c']))],
-                          join_input | 'join' >> beam.CoGroupByKey())
+                          join_input | beam.CoGroupByKey())
 
   def test_multi_input_ptransform(self):
     class DisjointUnion(PTransform):
@@ -776,9 +777,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-       | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2])
+       | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2])
           .with_input_types(str).with_output_types(int))
-       | 'upper' >> (beam.FlatMap(lambda x: [x.upper()])
+       | ('upper' >> beam.FlatMap(lambda x: [x.upper()])
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Type hint violation for 'upper': "
@@ -866,7 +867,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_filter_type_checks_using_type_hints_method(self):
     # No error should be raised if this type-checks properly.
     d = (self.p
-         | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+         | beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
          | 'to int' >> beam.Map(lambda x: int(x))
          .with_input_types(str).with_output_types(int)
          | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
@@ -904,9 +905,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_group_by_key_only_output_type_deduction(self):
     d = (self.p
          | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-         | 'pair' >> (beam.Map(lambda x: (x, ord(x)))
+         | ('pair' >> beam.Map(lambda x: (x, ord(x)))
             .with_output_types(typehints.KV[str, str]))
-         | 'O' >> beam.GroupByKeyOnly())
+         | beam.GroupByKeyOnly())
 
     # Output type should correctly be deduced.
     # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -916,9 +917,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_group_by_key_output_type_deduction(self):
     d = (self.p
          | 'str' >> beam.Create(range(20)).with_output_types(int)
-         | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x))
+         | ('pair negative' >> beam.Map(lambda x: (x % 5, -x))
             .with_output_types(typehints.KV[int, int]))
-         | 'T' >> beam.GroupByKey())
+         | beam.GroupByKey())
 
     # Output type should correctly be deduced.
     # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -929,8 +930,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # GBK will be passed raw int's here instead of some form of KV[A, B].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 's' >> beam.Create([1, 2, 3]).with_output_types(int)
-       | 'F' >> beam.GroupByKeyOnly())
+       | beam.Create([1, 2, 3]).with_output_types(int)
+       | beam.GroupByKeyOnly())
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -942,9 +943,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # aliased to Tuple[int, str].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 's' >> (beam.Create(range(5))
+       | (beam.Create(range(5))
           .with_output_types(typehints.Iterable[int]))
-       | 'T' >> beam.GroupByKey())
+       | beam.GroupByKey())
 
     self.assertEqual("Input type hint violation at T: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -973,7 +974,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       (self.p
        | 'nums' >> beam.Create(range(5)).with_output_types(int)
        | 'mod dup' >> beam.Map(lambda x: (x % 2, x))
-       | 'G' >> beam.GroupByKeyOnly())
+       | beam.GroupByKeyOnly())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -1092,10 +1093,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # ParDo should receive an instance of type 'str', however an 'int' will be
     # passed instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | 'n' >> beam.Create([1, 2, 3])
-       | 'to int' >> (beam.FlatMap(lambda x: [int(x)])
-          .with_input_types(str).with_output_types(int))
-      )
+      (self.p
+       | beam.Create([1, 2, 3])
+       | ('to int' >> beam.FlatMap(lambda x: [int(x)])
+          .with_input_types(str).with_output_types(int)))
       self.p.run()
 
     self.assertStartswith(
@@ -1111,8 +1112,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
-       | 'add' >> (beam.FlatMap(lambda (x, y): [x + y])
+       | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+       | ('add' >> beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
       )
       self.p.run()
@@ -1136,8 +1137,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         lambda x: [float(x)]).with_input_types(int).with_output_types(
             int).get_type_hints()
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | 'n' >> beam.Create([1, 2, 3])
-       | 'to int' >> (beam.FlatMap(lambda x: [float(x)])
+      (self.p
+       | beam.Create([1, 2, 3])
+       | ('to int' >> beam.FlatMap(lambda x: [float(x)])
           .with_input_types(int).with_output_types(int))
       )
       self.p.run()
@@ -1159,8 +1161,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # of 'int' will be generated instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
-       | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y])
+       | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+       | ('swap' >> beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, float])
           .with_output_types(typehints.Tuple[float, int]))
       )
@@ -1183,7 +1185,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return a + b
 
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
+      (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
       self.p.run()
 
     self.assertStartswith(
@@ -1199,8 +1201,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 't' >> beam.Create([1, 2, 3, 4])
-       | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0)
+       | beam.Create([1, 2, 3, 4])
+       | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0)
           .with_input_types(int, int)
           .with_output_types(float)))
       self.p.run()
@@ -1305,8 +1307,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_combine_pipeline_type_check_using_methods(self):
     d = (self.p
-         | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-         | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s))
+         | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | ('concat' >> beam.CombineGlobally(lambda s: ''.join(s))
             .with_input_types(str).with_output_types(str)))
 
     def matcher(expected):
@@ -1321,8 +1323,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 's' >> beam.Create(range(5)).with_output_types(int)
-         | 'sum' >> (beam.CombineGlobally(lambda s: sum(s))
+         | beam.Create(range(5)).with_output_types(int)
+         | ('sum' >> beam.CombineGlobally(lambda s: sum(s))
             .with_input_types(int).with_output_types(int)))
 
     assert_that(d, equal_to([10]))
@@ -1331,8 +1333,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_combine_pipeline_type_check_violation_using_methods(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'e' >> beam.Create(range(3)).with_output_types(int)
-       | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | beam.Create(range(3)).with_output_types(int)
+       | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Input type hint violation at sort join: "
@@ -1345,8 +1347,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'e' >> beam.Create(range(3)).with_output_types(int)
-       | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | beam.Create(range(3)).with_output_types(int)
+       | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
       self.p.run()
 
@@ -1427,8 +1429,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_mean_per_key_pipeline_checking_satisfied(self):
     d = (self.p
-         | 'c' >> beam.Create(range(5)).with_output_types(int)
-         | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
+         | beam.Create(range(5)).with_output_types(int)
+         | ('even group' >> beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
          | 'even mean' >> combine.Mean.PerKey())
 
@@ -1439,8 +1441,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_mean_per_key_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'e' >> beam.Create(map(str, range(5))).with_output_types(str)
-       | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x))
+       | beam.Create(map(str, range(5))).with_output_types(str)
+       | ('upper pair' >> beam.Map(lambda x: (x.upper(), x))
           .with_output_types(typehints.KV[str, str]))
        | 'even mean' >> combine.Mean.PerKey())
       self.p.run()
@@ -1455,8 +1457,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'c' >> beam.Create(range(5)).with_output_types(int)
-         | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x))
+         | beam.Create(range(5)).with_output_types(int)
+         | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x))
             .with_output_types(typehints.KV[bool, int]))
          | 'odd mean' >> combine.Mean.PerKey())
 
@@ -1470,8 +1472,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'c' >> beam.Create(range(5)).with_output_types(int)
-       | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2))))
+       | beam.Create(range(5)).with_output_types(int)
+       | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2))))
           .with_output_types(typehints.KV[int, str]))
        | 'odd mean' >> combine.Mean.PerKey())
       self.p.run()
@@ -1514,8 +1516,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_perkey_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | 'p' >> beam.Create(range(5)).with_output_types(int)
-         | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
+         | beam.Create(range(5)).with_output_types(int)
+         | ('even group' >> beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
          | 'count int' >> combine.Count.PerKey())
 
@@ -1526,7 +1528,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_count_perkey_pipeline_type_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'p' >> beam.Create(range(5)).with_output_types(int)
+       | beam.Create(range(5)).with_output_types(int)
        | 'count int' >> combine.Count.PerKey())
 
     self.assertEqual("Input type hint violation at GroupByKey: "
@@ -1538,7 +1540,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
          | 'dup key' >> beam.Map(lambda x: (x, x))
          .with_output_types(typehints.KV[str, str])
          | 'count dups' >> combine.Count.PerKey())
@@ -1549,7 +1551,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_perelement_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int)
+         | beam.Create([1, 1, 2, 3]).with_output_types(int)
          | 'count elems' >> combine.Count.PerElement())
 
     self.assertCompatible(typehints.KV[int, int], d.element_type)
@@ -1561,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'f' >> beam.Create([1, 1, 2, 3])
+       | beam.Create([1, 1, 2, 3])
        | 'count elems' >> combine.Count.PerElement())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
@@ -1573,7 +1575,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'w' >> beam.Create([True, True, False, True, True])
+         | beam.Create([True, True, False, True, True])
          .with_output_types(bool)
          | 'count elems' >> combine.Count.PerElement())
 
@@ -1583,7 +1585,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_top_of_pipeline_checking_satisfied(self):
     d = (self.p
-         | 'n' >> beam.Create(range(5, 11)).with_output_types(int)
+         | beam.Create(range(5, 11)).with_output_types(int)
          | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[int],
@@ -1595,7 +1597,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'n' >> beam.Create(list('testing')).with_output_types(str)
+         | beam.Create(list('testing')).with_output_types(str)
          | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[str], d.element_type)
@@ -1605,7 +1607,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_per_key_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'n' >> beam.Create(range(100)).with_output_types(int)
+       | beam.Create(range(100)).with_output_types(int)
        | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
        | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
@@ -1616,8 +1618,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_per_key_pipeline_checking_satisfied(self):
     d = (self.p
-         | 'n' >> beam.Create(range(100)).with_output_types(int)
-         | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
+         | beam.Create(range(100)).with_output_types(int)
+         | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
          | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
@@ -1630,8 +1632,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'n' >> beam.Create(range(21))
-         | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
+         | beam.Create(range(21))
+         | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
          | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
@@ -1642,7 +1644,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_sample_globally_pipeline_satisfied(self):
     d = (self.p
-         | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+         | beam.Create([2, 2, 3, 3]).with_output_types(int)
          | 'sample' >> combine.Sample.FixedSizeGlobally(3))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1658,7 +1660,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+         | beam.Create([2, 2, 3, 3]).with_output_types(int)
          | 'sample' >> combine.Sample.FixedSizeGlobally(2))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
@@ -1672,7 +1674,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_sample_per_key_pipeline_satisfied(self):
     d = (self.p
-         | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
+         | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
          | 'sample' >> combine.Sample.FixedSizePerKey(2))
 
@@ -1691,7 +1693,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
+         | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
          | 'sample' >> combine.Sample.FixedSizePerKey(1))
 
@@ -1708,8 +1710,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_to_list_pipeline_check_satisfied(self):
     d = (self.p
-         | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int)
-         | 'to list' >> combine.ToList())
+         | beam.Create((1, 2, 3, 4)).with_output_types(int)
+         | combine.ToList())
 
     self.assertCompatible(typehints.List[int], d.element_type)
 
@@ -1724,8 +1726,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'c' >> beam.Create(list('test')).with_output_types(str)
-         | 'to list' >> combine.ToList())
+         | beam.Create(list('test')).with_output_types(str)
+         | combine.ToList())
 
     self.assertCompatible(typehints.List[str], d.element_type)
 
@@ -1739,8 +1741,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_to_dict_pipeline_check_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
-       | 'to dict' >> combine.ToDict())
+       | beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | combine.ToDict())
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
                      "requires Tuple[TypeVariable[K], "
@@ -1751,9 +1753,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_to_dict_pipeline_check_satisfied(self):
     d = (self.p
          | beam.Create(
-             'd',
              [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
-         | 'to dict' >> combine.ToDict())
+         | combine.ToDict())
 
     self.assertCompatible(typehints.Dict[int, int], d.element_type)
     assert_that(d, equal_to([{1: 2, 3: 4}]))
@@ -1763,9 +1764,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'd' >> (beam.Create([('1', 2), ('3', 4)])
+         | (beam.Create([('1', 2), ('3', 4)])
             .with_output_types(typehints.Tuple[str, int]))
-         | 'to dict' >> combine.ToDict())
+         | combine.ToDict())
 
     self.assertCompatible(typehints.Dict[str, int], d.element_type)
     assert_that(d, equal_to([{'1': 2, '3': 4}]))
@@ -1776,7 +1777,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(TypeError) as e:
       (self.p
-       | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | beam.Create([1, 2, 3]).with_output_types(int)
        | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
       self.p.run()
 
@@ -1799,7 +1800,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
 
   def test_pipeline_inference(self):
-    created = self.p | 'c' >> beam.Create(['a', 'b', 'c'])
+    created = self.p | beam.Create(['a', 'b', 'c'])
     mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1))
     grouped = mapped | beam.GroupByKey()
     self.assertEqual(str, created.element_type)
@@ -1810,7 +1811,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_inferred_bad_kv_type(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       _ = (self.p
-           | 't' >> beam.Create(['a', 'b', 'c'])
+           | beam.Create(['a', 'b', 'c'])
            | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
            | beam.GroupByKey())
 
@@ -1821,11 +1822,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_type_inference_command_line_flag_toggle(self):
     self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    x = self.p | 't' >> beam.Create([1, 2, 3, 4])
+    x = self.p | 'c1' >> beam.Create([1, 2, 3, 4])
     self.assertIsNone(x.element_type)
 
     self.p.options.view_as(TypeOptions).pipeline_type_check = True
-    x = self.p | 'm' >> beam.Create([1, 2, 3, 4])
+    x = self.p | 'c2' >> beam.Create([1, 2, 3, 4])
     self.assertEqual(int, x.element_type)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index bbb7787..4564cf9 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -220,7 +220,8 @@ def assert_that(actual, matcher, label='assert_that'):
   class AssertThat(PTransform):
 
     def apply(self, pipeline):
-      return pipeline | 'singleton' >> Create([None]) | Map(match, AllOf(actual))
+      return pipeline | 'singleton' >> Create([None]) | Map(match,
+                                                            AllOf(actual))
 
     def default_label(self):
       return label

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 4e1ab68..f2e8f12 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -177,7 +177,7 @@ class SideInputTest(unittest.TestCase):
 
     bad_side_input = p | 'bad_side' >> beam.Create(['z'])
     with self.assertRaises(typehints.TypeCheckError):
-      main_input | 'again' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
+      main_input | 'bis' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
 
   def test_deferred_side_input_iterable(self):
     @typehints.with_input_types(str, typehints.Iterable[str])