You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/19 21:26:24 UTC

[3/4] beam git commit: Remove unneeded labels, and convert existing labels to UpperCamelCase.

Remove unneeded labels, and convert existing labels to UpperCamelCase.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d863e686
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d863e686
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d863e686

Branch: refs/heads/python-sdk
Commit: d863e6863b1487aa884151c9a317076dab8facd6
Parents: 0db60e4
Author: Ahmet Altay <al...@google.com>
Authored: Thu Jan 19 12:28:48 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jan 19 12:28:48 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/complete/tfidf.py      |  38 +-
 .../cookbook/bigquery_side_input_test.py        |  13 +-
 .../apache_beam/examples/cookbook/filters.py    |  12 +-
 .../examples/cookbook/mergecontacts.py          |  16 +-
 .../apache_beam/examples/streaming_wordcount.py |  10 +-
 sdks/python/apache_beam/io/fileio_test.py       |  12 +-
 sdks/python/apache_beam/io/iobase.py            |  12 +-
 sdks/python/apache_beam/io/textio_test.py       |   6 +-
 sdks/python/apache_beam/runners/runner_test.py  |  25 +-
 .../apache_beam/transforms/ptransform_test.py   | 513 +++++++++----------
 10 files changed, 325 insertions(+), 332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index c048cdd..367e275 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -42,9 +42,9 @@ def read_documents(pipeline, uris):
   for uri in uris:
     pcolls.append(
         pipeline
-        | 'read: %s' % uri >> ReadFromText(uri)
-        | 'withkey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri))
-  return pcolls | 'flatten read pcolls' >> beam.Flatten()
+        | 'Read: %s' % uri >> ReadFromText(uri)
+        | 'WithKey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri))
+  return pcolls | 'FlattenReadPColls' >> beam.Flatten()
 
 
 class TfIdf(beam.PTransform):
@@ -61,9 +61,9 @@ class TfIdf(beam.PTransform):
     # PCollection to use as side input.
     total_documents = (
         uri_to_content
-        | 'get uris' >> beam.Keys()
-        | 'get unique uris' >> beam.RemoveDuplicates()
-        | ' count uris' >> beam.combiners.Count.Globally())
+        | 'GetUris 1' >> beam.Keys()
+        | 'GetUniqueUris' >> beam.RemoveDuplicates()
+        | 'CountUris' >> beam.combiners.Count.Globally())
 
     # Create a collection of pairs mapping a URI to each of the words
     # in the document associated with that that URI.
@@ -73,35 +73,35 @@ class TfIdf(beam.PTransform):
 
     uri_to_words = (
         uri_to_content
-        | 'split words' >> beam.FlatMap(split_into_words))
+        | 'SplitWords' >> beam.FlatMap(split_into_words))
 
     # Compute a mapping from each word to the total number of documents
     # in which it appears.
     word_to_doc_count = (
         uri_to_words
-        | 'get unique words per doc' >> beam.RemoveDuplicates()
-        | 'get words' >> beam.Values()
-        | 'count docs per word' >> beam.combiners.Count.PerElement())
+        | 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates()
+        | 'GetWords' >> beam.Values()
+        | 'CountDocsPerWord' >> beam.combiners.Count.PerElement())
 
     # Compute a mapping from each URI to the total number of words in the
     # document associated with that URI.
     uri_to_word_total = (
         uri_to_words
-        | ' get uris' >> beam.Keys()
-        | 'count words in doc' >> beam.combiners.Count.PerElement())
+        | 'GetUris 2' >> beam.Keys()
+        | 'CountWordsInDoc' >> beam.combiners.Count.PerElement())
 
     # Count, for each (URI, word) pair, the number of occurrences of that word
     # in the document associated with the URI.
     uri_and_word_to_count = (
         uri_to_words
-        | 'count word-doc pairs' >> beam.combiners.Count.PerElement())
+        | 'CountWord-DocPairs' >> beam.combiners.Count.PerElement())
 
     # Adjust the above collection to a mapping from (URI, word) pairs to counts
     # into an isomorphic mapping from URI to (word, count) pairs, to prepare
     # for a join by the URI key.
     uri_to_word_and_count = (
         uri_and_word_to_count
-        | 'shift keys' >> beam.Map(
+        | 'ShiftKeys' >> beam.Map(
             lambda ((uri, word), count): (uri, (word, count))))
 
     # Perform a CoGroupByKey (a sort of pre-join) on the prepared
@@ -118,7 +118,7 @@ class TfIdf(beam.PTransform):
     #                         ... ]}
     uri_to_word_and_count_and_total = (
         {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
-        | 'cogroup by uri' >> beam.CoGroupByKey())
+        | 'CoGroupByUri' >> beam.CoGroupByKey())
 
     # Compute a mapping from each word to a (URI, term frequency) pair for each
     # URI. A word's term frequency for a document is simply the number of times
@@ -134,7 +134,7 @@ class TfIdf(beam.PTransform):
 
     word_to_uri_and_tf = (
         uri_to_word_and_count_and_total
-        | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency))
+        | 'ComputeTermFrequencies' >> beam.FlatMap(compute_term_frequency))
 
     # Compute a mapping from each word to its document frequency.
     # A word's document frequency in a corpus is the number of
@@ -149,7 +149,7 @@ class TfIdf(beam.PTransform):
     # DoFns in this way.
     word_to_df = (
         word_to_doc_count
-        | 'compute doc frequencies' >> beam.Map(
+        | 'ComputeDocFrequencies' >> beam.Map(
             lambda (word, count), total: (word, float(count) / total),
             AsSingleton(total_documents)))
 
@@ -157,7 +157,7 @@ class TfIdf(beam.PTransform):
     # each keyed on the word.
     word_to_uri_and_tf_and_df = (
         {'tf': word_to_uri_and_tf, 'df': word_to_df}
-        | 'cogroup words by tf-df' >> beam.CoGroupByKey())
+        | 'CoGroupWordsByTf-df' >> beam.CoGroupByKey())
 
     # Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
     # There are a variety of definitions of TF-IDF
@@ -172,7 +172,7 @@ class TfIdf(beam.PTransform):
 
     word_to_uri_and_tfidf = (
         word_to_uri_and_tf_and_df
-        | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf))
+        | 'ComputeTf-idf' >> beam.FlatMap(compute_tf_idf))
 
     return word_to_uri_and_tfidf
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 66cab77..5869976 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -30,14 +30,13 @@ class BigQuerySideInputTest(unittest.TestCase):
   def test_create_groups(self):
     p = TestPipeline()
 
-    group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C'])
-    corpus_pcoll = p | 'create_corpus' >> beam.Create(
+    group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C'])
+    corpus_pcoll = p | 'CreateCorpus' >> beam.Create(
         [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
-    words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'},
-                                                     {'f': 'word2'},
-                                                     {'f': 'word3'}])
-    ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1'])
-    ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1'])
+    words_pcoll = p | 'CreateWords' >> beam.Create(
+        [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}])
+    ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1'])
+    ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1'])
 
     groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll,
                                                words_pcoll, ignore_corpus_pcoll,

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/filters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py
index 7c77b9d..d13d823 100644
--- a/sdks/python/apache_beam/examples/cookbook/filters.py
+++ b/sdks/python/apache_beam/examples/cookbook/filters.py
@@ -53,21 +53,21 @@ def filter_cold_days(input_data, month_filter):
   projection_fields = ['year', 'month', 'day', 'mean_temp']
   fields_of_interest = (
       input_data
-      | 'projected' >> beam.Map(
+      | 'Projected' >> beam.Map(
           lambda row: {f: row[f] for f in projection_fields}))
 
   # Compute the global mean temperature.
   global_mean = AsSingleton(
       fields_of_interest
-      | 'extract mean' >> beam.Map(lambda row: row['mean_temp'])
-      | 'global mean' >> beam.combiners.Mean.Globally())
+      | 'ExtractMean' >> beam.Map(lambda row: row['mean_temp'])
+      | 'GlobalMean' >> beam.combiners.Mean.Globally())
 
   # Filter to the rows representing days in the month of interest
   # in which the mean daily temperature is below the global mean.
   return (
       fields_of_interest
-      | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter)
-      | 'below mean' >> beam.Filter(
+      | 'DesiredMonth' >> beam.Filter(lambda row: row['month'] == month_filter)
+      | 'BelowMean' >> beam.Filter(
           lambda row, mean: row['mean_temp'] < mean, global_mean))
 
 
@@ -92,7 +92,7 @@ def run(argv=None):
 
   # pylint: disable=expression-not-assigned
   (filter_cold_days(input_data, known_args.month_filter)
-   | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
+   | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink(
        known_args.output,
        schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 55bdc50..c880a9a 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -74,12 +74,12 @@ def run(argv=None, assert_results=None):
   # quotes/backslashes, and convert it a PCollection of (key, value) pairs.
   def read_kv_textfile(label, textfile):
     return (p
-            | 'read_%s' % label >> ReadFromText(textfile)
-            | 'backslash_%s' % label >> beam.Map(
+            | 'Read: %s' % label >> ReadFromText(textfile)
+            | 'Backslash: %s' % label >> beam.Map(
                 lambda x: re.sub(r'\\', r'\\\\', x))
-            | 'escape_quotes_%s' % label >> beam.Map(
+            | 'EscapeQuotes: %s' % label >> beam.Map(
                 lambda x: re.sub(r'"', r'\"', x))
-            | 'split_%s' % label >> beam.Map(
+            | 'Split: %s' % label >> beam.Map(
                 lambda x: re.split(r'\t+', x, 1)))
 
   # Read input databases.
@@ -107,13 +107,13 @@ def run(argv=None, assert_results=None):
   nomads = grouped | beam.Filter(    # People without addresses.
       lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None))
 
-  num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally()
-  num_writers = writers | 'writers' >> beam.combiners.Count.Globally()
-  num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally()
+  num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
+  num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
+  num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally()
 
   # Write tab-delimited output.
   # pylint: disable=expression-not-assigned
-  tsv_lines | 'write_tsv' >> WriteToText(known_args.output_tsv)
+  tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv)
 
   # TODO(silviuc): Move the assert_results logic to the unit test.
   if assert_results is not None:

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index e34a64e..7fb2c81 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -52,14 +52,14 @@ def run(argv=None):
 
   # Capitalize the characters in each line.
   transformed = (lines
-                 | 'split' >> (
+                 | 'Split' >> (
                      beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                      .with_output_types(unicode))
-                 | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+                 | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                  | beam.WindowInto(window.FixedWindows(15, 0))
-                 | 'group' >> beam.GroupByKey()
-                 | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
-                 | 'format' >> beam.Map(lambda tup: '%s: %d' % tup))
+                 | 'Group' >> beam.GroupByKey()
+                 | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+                 | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
 
   # Write to PubSub.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 772e5e2..f75bc5d 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -762,8 +762,8 @@ class TestNativeTextFileSink(unittest.TestCase):
 
   def test_write_native(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
-    pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path))  # pylint: disable=expression-not-assigned
+    pcoll = pipeline | beam.core.Create(self.lines)
+    pcoll | beam.Write(fileio.NativeTextFileSink(self.path))  # pylint: disable=expression-not-assigned
     pipeline.run()
 
     read_result = []
@@ -775,8 +775,8 @@ class TestNativeTextFileSink(unittest.TestCase):
 
   def test_write_native_auto_compression(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
-    pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
+    pcoll = pipeline | beam.core.Create(self.lines)
+    pcoll | beam.Write(  # pylint: disable=expression-not-assigned
         fileio.NativeTextFileSink(
             self.path, file_name_suffix='.gz'))
     pipeline.run()
@@ -790,8 +790,8 @@ class TestNativeTextFileSink(unittest.TestCase):
 
   def test_write_native_auto_compression_unsharded(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
-    pcoll | 'Write' >> beam.Write(  # pylint: disable=expression-not-assigned
+    pcoll = pipeline | beam.core.Create(self.lines)
+    pcoll | beam.Write(  # pylint: disable=expression-not-assigned
         fileio.NativeTextFileSink(
             self.path + '.gz', shard_name_template=''))
     pipeline.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 93421a6..12af3b6 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -727,7 +727,7 @@ class Write(ptransform.PTransform):
     from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
     if isinstance(self.sink, dataflow_io.NativeSink):
       # A native sink
-      return pcoll | 'native_write' >> dataflow_io._NativeWrite(self.sink)
+      return pcoll | 'NativeWrite' >> dataflow_io._NativeWrite(self.sink)
     elif isinstance(self.sink, Sink):
       # A custom sink
       return pcoll | WriteImpl(self.sink)
@@ -748,7 +748,7 @@ class WriteImpl(ptransform.PTransform):
 
   def expand(self, pcoll):
     do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
-    init_result_coll = do_once | 'initialize_write' >> core.Map(
+    init_result_coll = do_once | 'InitializeWrite' >> core.Map(
         lambda _, sink: sink.initialize_write(), self.sink)
     if getattr(self.sink, 'num_shards', 0):
       min_shards = self.sink.num_shards
@@ -759,20 +759,20 @@ class WriteImpl(ptransform.PTransform):
       write_result_coll = (keyed_pcoll
                            | core.WindowInto(window.GlobalWindows())
                            | core.GroupByKey()
-                           | 'write_bundles' >> core.Map(
+                           | 'WriteBundles' >> core.Map(
                                _write_keyed_bundle, self.sink,
                                AsSingleton(init_result_coll)))
     else:
       min_shards = 1
       write_result_coll = (pcoll
-                           | 'write_bundles' >>
+                           | 'WriteBundles' >>
                            core.ParDo(
                                _WriteBundleDoFn(), self.sink,
                                AsSingleton(init_result_coll))
-                           | 'pair' >> core.Map(lambda x: (None, x))
+                           | 'Pair' >> core.Map(lambda x: (None, x))
                            | core.WindowInto(window.GlobalWindows())
                            | core.GroupByKey()
-                           | 'extract' >> core.FlatMap(lambda x: x[1]))
+                           | 'Extract' >> core.FlatMap(lambda x: x[1]))
     return do_once | core.FlatMap(
         'finalize_write',
         _finalize_write,

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index ccb64ff..4b85584 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -524,7 +524,7 @@ class TextSinkTest(unittest.TestCase):
 
   def test_write_dataflow(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
+    pcoll = pipeline | beam.core.Create(self.lines)
     pcoll | 'Write' >> WriteToText(self.path)  # pylint: disable=expression-not-assigned
     pipeline.run()
 
@@ -537,7 +537,7 @@ class TextSinkTest(unittest.TestCase):
 
   def test_write_dataflow_auto_compression(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
+    pcoll = pipeline | beam.core.Create(self.lines)
     pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz')  # pylint: disable=expression-not-assigned
     pipeline.run()
 
@@ -550,7 +550,7 @@ class TextSinkTest(unittest.TestCase):
 
   def test_write_dataflow_auto_compression_unsharded(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
+    pcoll = pipeline | beam.core.Create(self.lines)
     pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='')  # pylint: disable=expression-not-assigned
     pipeline.run()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index f95b295..f522590 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -84,9 +84,9 @@ class RunnerTest(unittest.TestCase):
     p = Pipeline(remote_runner,
                  options=PipelineOptions(self.default_properties))
 
-    (p | 'create' >> ptransform.Create([1, 2, 3])  # pylint: disable=expression-not-assigned
-     | 'do' >> ptransform.FlatMap(lambda x: [(x, x)])
-     | 'gbk' >> ptransform.GroupByKey())
+    (p | ptransform.Create([1, 2, 3])  # pylint: disable=expression-not-assigned
+     | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
+     | ptransform.GroupByKey())
     remote_runner.job = apiclient.Job(p.options)
     super(DataflowRunner, remote_runner).run(p)
 
@@ -118,8 +118,8 @@ class RunnerTest(unittest.TestCase):
 
     now = datetime.now()
     # pylint: disable=expression-not-assigned
-    (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
-     | 'do' >> SpecialParDo(SpecialDoFn(), now))
+    (p | ptransform.Create([1, 2, 3, 4, 5])
+     | 'Do' >> SpecialParDo(SpecialDoFn(), now))
 
     remote_runner.job = apiclient.Job(p.options)
     super(DataflowRunner, remote_runner).run(p)
@@ -166,8 +166,8 @@ class RunnerTest(unittest.TestCase):
     p = Pipeline(runner,
                  options=PipelineOptions(self.default_properties))
     # pylint: disable=expression-not-assigned
-    (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
-     | 'do' >> beam.ParDo(MyDoFn()))
+    (p | ptransform.Create([1, 2, 3, 4, 5])
+     | 'Do' >> beam.ParDo(MyDoFn()))
     result = p.run()
     result.wait_until_finish()
     metrics = result.metrics().query()
@@ -178,19 +178,19 @@ class RunnerTest(unittest.TestCase):
         metrics['counters'],
         hc.contains_inanyorder(
             MetricResult(
-                MetricKey('do', MetricName(namespace, 'elements')),
+                MetricKey('Do', MetricName(namespace, 'elements')),
                 5, 5),
             MetricResult(
-                MetricKey('do', MetricName(namespace, 'bundles')),
+                MetricKey('Do', MetricName(namespace, 'bundles')),
                 1, 1),
             MetricResult(
-                MetricKey('do', MetricName(namespace, 'finished_bundles')),
+                MetricKey('Do', MetricName(namespace, 'finished_bundles')),
                 1, 1)))
     hc.assert_that(
         metrics['distributions'],
         hc.contains_inanyorder(
             MetricResult(
-                MetricKey('do', MetricName(namespace, 'element_dist')),
+                MetricKey('Do', MetricName(namespace, 'element_dist')),
                 DistributionResult(DistributionData(15, 5, 1, 5)),
                 DistributionResult(DistributionData(15, 5, 1, 5)))))
 
@@ -205,8 +205,7 @@ class RunnerTest(unittest.TestCase):
                      '--temp_location=/dev/null',
                      '--no_auth=True'
                  ]))
-    rows = p | 'read' >> beam.io.Read(
-        beam.io.BigQuerySource('dataset.faketable'))
+    rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable'))
     with self.assertRaises(ValueError,
                            msg=('Coder for the GroupByKey operation'
                                 '"GroupByKey" is not a key-value coder: '

http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 13b963c..827bc83 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -54,8 +54,8 @@ class PTransformTest(unittest.TestCase):
                      str(PTransform()))
 
     pa = TestPipeline()
-    res = pa | 'a_label' >> beam.Create([1, 2])
-    self.assertEqual('AppliedPTransform(a_label, Create)',
+    res = pa | 'ALabel' >> beam.Create([1, 2])
+    self.assertEqual('AppliedPTransform(ALabel, Create)',
                      str(res.producer))
 
     pc = TestPipeline()
@@ -111,8 +111,8 @@ class PTransformTest(unittest.TestCase):
         return [context.element + addon]
 
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
-    result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'Do' >> beam.ParDo(AddNDoFn(), 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
@@ -123,39 +123,39 @@ class PTransformTest(unittest.TestCase):
         pass
 
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
     with self.assertRaises(ValueError):
-      pcoll | 'do' >> beam.ParDo(MyDoFn)  # Note the lack of ()'s
+      pcoll | 'Do' >> beam.ParDo(MyDoFn)  # Note the lack of ()'s
 
   def test_do_with_callable(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
-    result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'Do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
   def test_do_with_side_input_as_arg(self):
     pipeline = TestPipeline()
-    side = pipeline | 'side' >> beam.Create([10])
-    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
-    result = pcoll | beam.FlatMap(
-        'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side))
+    side = pipeline | 'Side' >> beam.Create([10])
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'Do' >> beam.FlatMap(
+        lambda x, addon: [x + addon], pvalue.AsSingleton(side))
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
   def test_do_with_side_input_as_keyword_arg(self):
     pipeline = TestPipeline()
-    side = pipeline | 'side' >> beam.Create([10])
-    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
-    result = pcoll | beam.FlatMap(
-        'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
+    side = pipeline | 'Side' >> beam.Create([10])
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'Do' >> beam.FlatMap(
+        lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
   def test_do_with_do_fn_returning_string_raises_warning(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
-    pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
+    pcoll = pipeline | 'Start' >> beam.Create(['2', '9', '3'])
+    pcoll | 'Do' >> beam.FlatMap(lambda x: x + '1')
 
     # Since the DoFn directly returns a string we should get an error warning
     # us.
@@ -168,8 +168,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_do_fn_returning_dict_raises_warning(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
-    pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
+    pcoll = pipeline | 'Start' >> beam.Create(['2', '9', '3'])
+    pcoll | 'Do' >> beam.FlatMap(lambda x: {x: '1'})
 
     # Since the DoFn directly returns a dict we should get an error warning
     # us.
@@ -182,9 +182,9 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_side_outputs_maintains_unique_name(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
-    r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
-    r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+    r1 = pcoll | 'A' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
+    r2 = pcoll | 'B' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
     assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
     assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
     pipeline.run()
@@ -195,8 +195,8 @@ class PTransformTest(unittest.TestCase):
     def incorrect_par_do_fn(x):
       return x + 5
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
-    pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
+    pcoll = pipeline | 'Start' >> beam.Create([2, 9, 3])
+    pcoll | 'Do' >> beam.FlatMap(incorrect_par_do_fn)
     # It's a requirement that all user-defined functions to a ParDo return
     # an iterable.
     with self.assertRaises(typehints.TypeCheckError) as cm:
@@ -216,8 +216,8 @@ class PTransformTest(unittest.TestCase):
       def finish_bundle(self, c):
         yield 'finish'
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
-    result = pcoll | 'do' >> beam.ParDo(MyDoFn())
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'Do' >> beam.ParDo(MyDoFn())
 
     # May have many bundles, but each has a start and finish.
     def  matcher():
@@ -231,9 +231,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_filter(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
-    result = pcoll | beam.Filter(
-        'filter', lambda x: x % 2 == 0)
+    pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3, 4])
+    result = pcoll | 'Filter' >> beam.Filter(lambda x: x % 2 == 0)
     assert_that(result, equal_to([2, 4]))
     pipeline.run()
 
@@ -257,15 +256,15 @@ class PTransformTest(unittest.TestCase):
   def test_combine_with_combine_fn(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(vals)
-    result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
+    pcoll = pipeline | 'Start' >> beam.Create(vals)
+    result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn())
     assert_that(result, equal_to([sum(vals) / len(vals)]))
     pipeline.run()
 
   def test_combine_with_callable(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(vals)
+    pcoll = pipeline | 'Start' >> beam.Create(vals)
     result = pcoll | beam.CombineGlobally(sum)
     assert_that(result, equal_to([sum(vals)]))
     pipeline.run()
@@ -273,10 +272,9 @@ class PTransformTest(unittest.TestCase):
   def test_combine_with_side_input_as_arg(self):
     values = [1, 2, 3, 4, 5, 6, 7]
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(values)
-    divisor = pipeline | 'divisor' >> beam.Create([2])
-    result = pcoll | beam.CombineGlobally(
-        'max',
+    pcoll = pipeline | 'Start' >> beam.Create(values)
+    divisor = pipeline | 'Divisor' >> beam.Create([2])
+    result = pcoll | 'Max' >> beam.CombineGlobally(
         # Multiples of divisor only.
         lambda vals, d: max(v for v in vals if v % d == 0),
         pvalue.AsSingleton(divisor)).without_defaults()
@@ -288,9 +286,9 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
-    result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
+    result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn())
     assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
                                   ('b', sum(vals_2) / len(vals_2))]))
     pipeline.run()
@@ -299,7 +297,7 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
     result = pcoll | beam.CombinePerKey(sum)
     assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
@@ -309,9 +307,9 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] +
                                                [('b', x) for x in vals_2]))
-    divisor = pipeline | 'divisor' >> beam.Create([2])
+    divisor = pipeline | 'Divisor' >> beam.Create([2])
     result = pcoll | beam.CombinePerKey(
         lambda vals, d: max(v for v in vals if v % d == 0),
         pvalue.AsSingleton(divisor))  # Multiples of divisor only.
@@ -324,7 +322,7 @@ class PTransformTest(unittest.TestCase):
     pipeline = TestPipeline()
     pcoll = pipeline | beam.Create(
         'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
-    result = pcoll | 'group' >> beam.GroupByKey()
+    result = pcoll | 'Group' >> beam.GroupByKey()
     assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
     pipeline.run()
 
@@ -336,9 +334,9 @@ class PTransformTest(unittest.TestCase):
         return (context.element % 3) + offset
 
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+    pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     # Attempt nominal partition operation.
-    partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
+    partitions = pcoll | 'Part 1' >> beam.Partition(SomePartitionFn(), 4, 1)
     assert_that(partitions[0], equal_to([]))
     assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
     assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
@@ -348,14 +346,14 @@ class PTransformTest(unittest.TestCase):
     # Check that a bad partition label will yield an error. For the
     # DirectRunner, this error manifests as an exception.
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
-    partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
+    pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+    partitions = pcoll | 'Part 2' >> beam.Partition(SomePartitionFn(), 4, 10000)
     with self.assertRaises(ValueError):
       pipeline.run()
 
   def test_partition_with_callable(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+    pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = (
         pcoll | beam.Partition(
             'part',
@@ -380,48 +378,47 @@ class PTransformTest(unittest.TestCase):
 
   def test_flatten_pcollections(self):
     pipeline = TestPipeline()
-    pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
-    pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
-    result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
+    pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])
+    pcoll_2 = pipeline | 'Start 2' >> beam.Create([4, 5, 6, 7])
+    result = (pcoll_1, pcoll_2) | 'Flatten' >> beam.Flatten()
     assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
     pipeline.run()
 
   def test_flatten_no_pcollections(self):
     pipeline = TestPipeline()
     with self.assertRaises(ValueError):
-      () | 'pipeline arg missing' >> beam.Flatten()
-    result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
+      () | 'PipelineArgMissing' >> beam.Flatten()
+    result = () | 'Empty' >> beam.Flatten(pipeline=pipeline)
     assert_that(result, equal_to([]))
     pipeline.run()
 
   def test_flatten_pcollections_in_iterable(self):
     pipeline = TestPipeline()
-    pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
-    pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
-    result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
-              | 'flatten' >> beam.Flatten())
+    pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])
+    pcoll_2 = pipeline | 'Start 2' >> beam.Create([4, 5, 6, 7])
+    result = [pcoll for pcoll in (pcoll_1, pcoll_2)] | beam.Flatten()
     assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
     pipeline.run()
 
   def test_flatten_input_type_must_be_iterable(self):
     # Inputs to flatten *must* be an iterable.
     with self.assertRaises(ValueError):
-      4 | 'flatten' >> beam.Flatten()
+      4 | beam.Flatten()
 
   def test_flatten_input_type_must_be_iterable_of_pcolls(self):
     # Inputs to flatten *must* be an iterable of PCollections.
     with self.assertRaises(TypeError):
-      {'l': 'test'} | 'flatten' >> beam.Flatten()
+      {'l': 'test'} | beam.Flatten()
     with self.assertRaises(TypeError):
-      set([1, 2, 3]) | 'flatten' >> beam.Flatten()
+      set([1, 2, 3]) | beam.Flatten()
 
   def test_co_group_by_key_on_list(self):
     pipeline = TestPipeline()
-    pcoll_1 = pipeline | beam.Create(
-        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
-    pcoll_2 = pipeline | beam.Create(
-        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = (pcoll_1, pcoll_2) | 'cgbk' >> beam.CoGroupByKey()
+    pcoll_1 = pipeline | 'Start 1' >> beam.Create(
+        [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+    pcoll_2 = pipeline | 'Start 2' >> beam.Create(
+        [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+    result = (pcoll_1, pcoll_2) | beam.CoGroupByKey()
     assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
                                   ('b', ([3], [])),
                                   ('c', ([4], [7, 8]))]))
@@ -429,12 +426,11 @@ class PTransformTest(unittest.TestCase):
 
   def test_co_group_by_key_on_iterable(self):
     pipeline = TestPipeline()
-    pcoll_1 = pipeline | beam.Create(
-        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
-    pcoll_2 = pipeline | beam.Create(
-        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = ([pc for pc in (pcoll_1, pcoll_2)]
-              | 'cgbk' >> beam.CoGroupByKey())
+    pcoll_1 = pipeline | 'Start 1' >> beam.Create(
+        [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+    pcoll_2 = pipeline | 'Start 2' >> beam.Create(
+        [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+    result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey()
     assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
                                   ('b', ([3], [])),
                                   ('c', ([4], [7, 8]))]))
@@ -442,11 +438,11 @@ class PTransformTest(unittest.TestCase):
 
   def test_co_group_by_key_on_dict(self):
     pipeline = TestPipeline()
-    pcoll_1 = pipeline | beam.Create(
-        'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
-    pcoll_2 = pipeline | beam.Create(
-        'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = {'X': pcoll_1, 'Y': pcoll_2} | 'cgbk' >> beam.CoGroupByKey()
+    pcoll_1 = pipeline | 'Start 1' >> beam.Create(
+        [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
+    pcoll_2 = pipeline | 'Start 2' >> beam.Create(
+        [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
+    result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
     assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
                                   ('b', {'X': [3], 'Y': []}),
                                   ('c', {'X': [4], 'Y': [7, 8]})]))
@@ -478,8 +474,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_keys_and_values(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.Create(
-        'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
+    pcoll = pipeline | 'Start' >> beam.Create(
+        [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
     keys = pcoll.apply('keys', beam.Keys())
     vals = pcoll.apply('vals', beam.Values())
     assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys')
@@ -488,16 +484,16 @@ class PTransformTest(unittest.TestCase):
 
   def test_kv_swap(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.Create(
-        'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
+    pcoll = pipeline | 'Start' >> beam.Create(
+        [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)])
     result = pcoll.apply('swap', beam.KvSwap())
     assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
     pipeline.run()
 
   def test_remove_duplicates(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.Create(
-        'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
+    pcoll = pipeline | 'Start' >> beam.Create(
+        [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
     result = pcoll.apply('nodupes', beam.RemoveDuplicates())
     assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
     pipeline.run()
@@ -507,15 +503,15 @@ class PTransformTest(unittest.TestCase):
     t = (beam.Map(lambda x: (x, 1))
          | beam.GroupByKey()
          | beam.Map(lambda (x, ones): (x, sum(ones))))
-    result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
+    result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t
     assert_that(result, equal_to([('a', 2), ('b', 1)]))
     pipeline.run()
 
   def test_apply_to_list(self):
     self.assertItemsEqual(
-        [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
+        [1, 2, 3], [0, 1, 2] | 'AddOne' >> beam.Map(lambda x: x + 1))
     self.assertItemsEqual([1],
-                          [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2))
+                          [0, 1, 2] | 'Odd' >> beam.Filter(lambda x: x % 2))
     self.assertItemsEqual([1, 2, 100, 3],
                           ([1, 2, 3], [100]) | beam.Flatten())
     join_input = ([('k', 'a')],
@@ -575,47 +571,47 @@ class PTransformLabelsTest(unittest.TestCase):
     pardo = None
 
     def expand(self, pcoll):
-      self.pardo = '*do*' >> beam.FlatMap(lambda x: [x + 1])
+      self.pardo = '*Do*' >> beam.FlatMap(lambda x: [x + 1])
       return pcoll | self.pardo
 
   def test_chained_ptransforms(self):
     """Tests that chaining gets proper nesting."""
     pipeline = TestPipeline()
-    map1 = 'map1' >> beam.Map(lambda x: (x, 1))
-    gbk = 'gbk' >> beam.GroupByKey()
-    map2 = 'map2' >> beam.Map(lambda (x, ones): (x, sum(ones)))
+    map1 = 'Map1' >> beam.Map(lambda x: (x, 1))
+    gbk = 'Gbk' >> beam.GroupByKey()
+    map2 = 'Map2' >> beam.Map(lambda (x, ones): (x, sum(ones)))
     t = (map1 | gbk | map2)
-    result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
-    self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
-    self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels)
-    self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels)
+    result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t
+    self.assertTrue('Map1|Gbk|Map2/Map1' in pipeline.applied_labels)
+    self.assertTrue('Map1|Gbk|Map2/Gbk' in pipeline.applied_labels)
+    self.assertTrue('Map1|Gbk|Map2/Map2' in pipeline.applied_labels)
     assert_that(result, equal_to([('a', 2), ('b', 1)]))
     pipeline.run()
 
   def test_apply_custom_transform_without_label(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
+    pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform()
     result = pipeline.apply(custom, pcoll)
     self.assertTrue('CustomTransform' in pipeline.applied_labels)
-    self.assertTrue('CustomTransform/*do*' in pipeline.applied_labels)
+    self.assertTrue('CustomTransform/*Do*' in pipeline.applied_labels)
     assert_that(result, equal_to([2, 3, 4]))
     pipeline.run()
 
   def test_apply_custom_transform_with_label(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
-    custom = PTransformLabelsTest.CustomTransform('*custom*')
+    pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3])
+    custom = PTransformLabelsTest.CustomTransform('*Custom*')
     result = pipeline.apply(custom, pcoll)
-    self.assertTrue('*custom*' in pipeline.applied_labels)
-    self.assertTrue('*custom*/*do*' in pipeline.applied_labels)
+    self.assertTrue('*Custom*' in pipeline.applied_labels)
+    self.assertTrue('*Custom*/*Do*' in pipeline.applied_labels)
     assert_that(result, equal_to([2, 3, 4]))
     pipeline.run()
 
   def test_combine_without_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(vals)
+    pcoll = pipeline | 'Start' >> beam.Create(vals)
     combine = beam.CombineGlobally(sum)
     result = pcoll | combine
     self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels)
@@ -624,28 +620,28 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_apply_ptransform_using_decorator(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
-    sample = SamplePTransform('*sample*')
+    pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3])
+    sample = SamplePTransform('*Sample*')
     _ = pcoll | sample
-    self.assertTrue('*sample*' in pipeline.applied_labels)
-    self.assertTrue('*sample*/ToPairs' in pipeline.applied_labels)
-    self.assertTrue('*sample*/Group' in pipeline.applied_labels)
-    self.assertTrue('*sample*/RemoveDuplicates' in pipeline.applied_labels)
+    self.assertTrue('*Sample*' in pipeline.applied_labels)
+    self.assertTrue('*Sample*/ToPairs' in pipeline.applied_labels)
+    self.assertTrue('*Sample*/Group' in pipeline.applied_labels)
+    self.assertTrue('*Sample*/RemoveDuplicates' in pipeline.applied_labels)
 
   def test_combine_with_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create(vals)
-    combine = '*sum*' >> beam.CombineGlobally(sum)
+    pcoll = pipeline | 'Start' >> beam.Create(vals)
+    combine = '*Sum*' >> beam.CombineGlobally(sum)
     result = pcoll | combine
-    self.assertTrue('*sum*' in pipeline.applied_labels)
+    self.assertTrue('*Sum*' in pipeline.applied_labels)
     assert_that(result, equal_to([sum(vals)]))
     pipeline.run()
 
   def check_label(self, ptransform, expected_label):
     pipeline = TestPipeline()
-    pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
-    actual_label = sorted(pipeline.applied_labels - {'start'})[0]
+    pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform
+    actual_label = sorted(pipeline.applied_labels - {'Start'})[0]
     self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
 
   def test_default_labels(self):
@@ -737,8 +733,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         return [context.element + five]
 
     d = (self.p
-         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
-         | 'add' >> beam.ParDo(AddWithFive(), 5))
+         | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'Add' >> beam.ParDo(AddWithFive(), 5))
 
     assert_that(d, equal_to([6, 7, 8]))
     self.p.run()
@@ -752,10 +748,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
-       | 'upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
+       | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
 
-    self.assertEqual("Type hint violation for 'upper': "
+    self.assertEqual("Type hint violation for 'Upper': "
                      "requires <type 'str'> but got <type 'int'> for context",
                      e.exception.message)
 
@@ -769,8 +765,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         return [context.element + num]
 
     d = (self.p
-         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
-         | 'add' >> beam.ParDo(AddWithNum(), 5))
+         | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'Add' >> beam.ParDo(AddWithNum(), 5))
 
     assert_that(d, equal_to([6, 7, 8]))
     self.p.run()
@@ -786,11 +782,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 't' >> beam.Create(['1', '2', '3']).with_output_types(str)
-       | 'add' >> beam.ParDo(AddWithNum(), 5))
+       | 'T' >> beam.Create(['1', '2', '3']).with_output_types(str)
+       | 'Add' >> beam.ParDo(AddWithNum(), 5))
       self.p.run()
 
-    self.assertEqual("Type hint violation for 'add': "
+    self.assertEqual("Type hint violation for 'Add': "
                      "requires <type 'int'> but got <type 'str'> for context",
                      e.exception.message)
 
@@ -804,10 +800,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # will receive a str instead, which should result in a raised exception.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 's' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
-       | 'to str' >> beam.FlatMap(int_to_str))
+       | 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
+       | 'ToStr' >> beam.FlatMap(int_to_str))
 
-    self.assertEqual("Type hint violation for 'to str': "
+    self.assertEqual("Type hint violation for 'ToStr': "
                      "requires <type 'int'> but got <type 'str'> for a",
                      e.exception.message)
 
@@ -819,8 +815,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # If this type-checks than no error should be raised.
     d = (self.p
-         | 't' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-         | 'case' >> beam.FlatMap(to_all_upper_case))
+         | 'T' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'Case' >> beam.FlatMap(to_all_upper_case))
     assert_that(d, equal_to(['T', 'E', 'S', 'T']))
     self.p.run()
 
@@ -833,23 +829,23 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # expecting pcoll's of type str instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-       | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2])
+       | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+       | ('Score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2])
           .with_input_types(str).with_output_types(int))
-       | ('upper' >> beam.FlatMap(lambda x: [x.upper()])
+       | ('Upper' >> beam.FlatMap(lambda x: [x.upper()])
           .with_input_types(str).with_output_types(str)))
 
-    self.assertEqual("Type hint violation for 'upper': "
+    self.assertEqual("Type hint violation for 'Upper': "
                      "requires <type 'str'> but got <type 'int'> for x",
                      e.exception.message)
 
   def test_pardo_properly_type_checks_using_type_hint_methods(self):
     # Pipeline should be created successfully without an error
     d = (self.p
-         | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-         | 'dup' >> beam.FlatMap(lambda x: [x + x])
+         | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'Dup' >> beam.FlatMap(lambda x: [x + x])
          .with_input_types(str).with_output_types(str)
-         | 'upper' >> beam.FlatMap(lambda x: [x.upper()])
+         | 'Upper' >> beam.FlatMap(lambda x: [x.upper()])
          .with_input_types(str).with_output_types(str))
 
     assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT']))
@@ -860,19 +856,19 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # int's, while Map is expecting one of str.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
-       | 'upper' >> beam.Map(lambda x: x.upper())
+       | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'Upper' >> beam.Map(lambda x: x.upper())
        .with_input_types(str).with_output_types(str))
 
-    self.assertEqual("Type hint violation for 'upper': "
+    self.assertEqual("Type hint violation for 'Upper': "
                      "requires <type 'str'> but got <type 'int'> for x",
                      e.exception.message)
 
   def test_map_properly_type_checks_using_type_hints_methods(self):
     # No error should be raised if this type-checks properly.
     d = (self.p
-         | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
-         | 'to_str' >> beam.Map(lambda x: str(x))
+         | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+         | 'ToStr' >> beam.Map(lambda x: str(x))
          .with_input_types(int).with_output_types(str))
     assert_that(d, equal_to(['1', '2', '3', '4']))
     self.p.run()
@@ -887,10 +883,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # However, 'Map' should detect that Create has hinted an int instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
-       | 'upper' >> beam.Map(upper))
+       | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'Upper' >> beam.Map(upper))
 
-    self.assertEqual("Type hint violation for 'upper': "
+    self.assertEqual("Type hint violation for 'Upper': "
                      "requires <type 'str'> but got <type 'int'> for s",
                      e.exception.message)
 
@@ -912,12 +908,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # incoming.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
-       | 'lower' >> beam.Map(lambda x: x.lower())
+       | 'Strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
+       | 'Lower' >> beam.Map(lambda x: x.lower())
        .with_input_types(str).with_output_types(str)
-       | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
+       | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
 
-    self.assertEqual("Type hint violation for 'below 3': "
+    self.assertEqual("Type hint violation for 'Below 3': "
                      "requires <type 'int'> but got <type 'str'> for x",
                      e.exception.message)
 
@@ -925,9 +921,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # No error should be raised if this type-checks properly.
     d = (self.p
          | beam.Create(['1', '2', '3', '4', '5']).with_output_types(str)
-         | 'to int' >> beam.Map(lambda x: int(x))
+         | 'ToInt' >> beam.Map(lambda x: int(x))
          .with_input_types(str).with_output_types(int)
-         | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
+         | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
     assert_that(d, equal_to([1, 2]))
     self.p.run()
 
@@ -939,10 +935,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Func above was hinted to only take a float, yet an int will be passed.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
-       | 'half' >> beam.Filter(more_than_half))
+       | 'Ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'Half' >> beam.Filter(more_than_half))
 
-    self.assertEqual("Type hint violation for 'half': "
+    self.assertEqual("Type hint violation for 'Half': "
                      "requires <type 'float'> but got <type 'int'> for a",
                      e.exception.message)
 
@@ -954,15 +950,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # Filter should deduce that it returns the same type that it takes.
     (self.p
-     | 'str' >> beam.Create(range(5)).with_output_types(int)
-     | 'half' >> beam.Filter(half)
-     | 'to bool' >> beam.Map(lambda x: bool(x))
+     | 'Str' >> beam.Create(range(5)).with_output_types(int)
+     | 'Half' >> beam.Filter(half)
+     | 'ToBool' >> beam.Map(lambda x: bool(x))
      .with_input_types(int).with_output_types(bool))
 
   def test_group_by_key_only_output_type_deduction(self):
     d = (self.p
-         | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-         | ('pair' >> beam.Map(lambda x: (x, ord(x)))
+         | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | ('Pair' >> beam.Map(lambda x: (x, ord(x)))
             .with_output_types(typehints.KV[str, str]))
          | beam.GroupByKeyOnly())
 
@@ -973,8 +969,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_group_by_key_output_type_deduction(self):
     d = (self.p
-         | 'str' >> beam.Create(range(20)).with_output_types(int)
-         | ('pair negative' >> beam.Map(lambda x: (x % 5, -x))
+         | 'Str' >> beam.Create(range(20)).with_output_types(int)
+         | ('PairNegative' >> beam.Map(lambda x: (x % 5, -x))
             .with_output_types(typehints.KV[int, int]))
          | beam.GroupByKey())
 
@@ -1016,11 +1012,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # information to the ParDo.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'nums' >> beam.Create(range(5))
-       | 'mod dup' >> beam.FlatMap(lambda x: (x % 2, x)))
+       | 'Nums' >> beam.Create(range(5))
+       | 'ModDup' >> beam.FlatMap(lambda x: (x % 2, x)))
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
-                     'type-hint was found for the PTransform Create(nums)',
+                     'type-hint was found for the PTransform Create(Nums)',
                      e.exception.message)
 
   def test_pipeline_checking_gbk_insufficient_type_information(self):
@@ -1029,13 +1025,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # information to GBK-only.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'nums' >> beam.Create(range(5)).with_output_types(int)
-       | 'mod dup' >> beam.Map(lambda x: (x % 2, x))
+       | 'Nums' >> beam.Create(range(5)).with_output_types(int)
+       | 'ModDup' >> beam.Map(lambda x: (x % 2, x))
        | beam.GroupByKeyOnly())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
-                     'ParDo(mod dup)',
+                     'ParDo(ModDup)',
                      e.exception.message)
 
   def test_disable_pipeline_type_check(self):
@@ -1044,8 +1040,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # The pipeline below should raise a TypeError, however pipeline type
     # checking was disabled above.
     (self.p
-     | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
-     | 'lower' >> beam.Map(lambda x: x.lower())
+     | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+     | 'Lower' >> beam.Map(lambda x: x.lower())
      .with_input_types(str).with_output_types(str))
 
   def test_run_time_type_checking_enabled_type_violation(self):
@@ -1060,14 +1056,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Function above has been type-hinted to only accept an int. But in the
     # pipeline execution it'll be passed a string due to the output of Create.
     (self.p
-     | 't' >> beam.Create(['some_string'])
-     | 'to str' >> beam.Map(int_to_string))
+     | 'T' >> beam.Create(['some_string'])
+     | 'ToStr' >> beam.Map(int_to_string))
     with self.assertRaises(typehints.TypeCheckError) as e:
       self.p.run()
 
     self.assertStartswith(
         e.exception.message,
-        "Runtime type violation detected within ParDo(to str): "
+        "Runtime type violation detected within ParDo(ToStr): "
         "Type-hint for argument: 'x' violated. "
         "Expected an instance of <type 'int'>, "
         "instead found some_string, an instance of <type 'str'>.")
@@ -1084,9 +1080,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Pipeline checking is off, but the above function should satisfy types at
     # run-time.
     result = (self.p
-              | 't' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
+              | 'T' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
               .with_output_types(str)
-              | 'gen keys' >> beam.Map(group_with_upper_ord)
+              | 'GenKeys' >> beam.Map(group_with_upper_ord)
               | 'O' >> beam.GroupByKey())
 
     assert_that(result, equal_to([(1, ['g']),
@@ -1106,9 +1102,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return (a % 2, a)
 
     (self.p
-     | 'nums' >> beam.Create(range(5)).with_output_types(int)
-     | 'is even' >> beam.Map(is_even_as_key)
-     | 'parity' >> beam.GroupByKey())
+     | 'Nums' >> beam.Create(range(5)).with_output_types(int)
+     | 'IsEven' >> beam.Map(is_even_as_key)
+     | 'Parity' >> beam.GroupByKey())
 
     # Although all the types appear to be correct when checked at pipeline
     # construction. Runtime type-checking should detect the 'is_even_as_key' is
@@ -1118,7 +1114,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     self.assertStartswith(
         e.exception.message,
-        "Runtime type violation detected within ParDo(is even): "
+        "Runtime type violation detected within ParDo(IsEven): "
         "Tuple[bool, int] hint type-constraint violated. "
         "The type of element #0 in the passed tuple is incorrect. "
         "Expected an instance of type bool, "
@@ -1135,9 +1131,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return (a % 2 == 0, a)
 
     result = (self.p
-              | 'nums' >> beam.Create(range(5)).with_output_types(int)
-              | 'is even' >> beam.Map(is_even_as_key)
-              | 'parity' >> beam.GroupByKey())
+              | 'Nums' >> beam.Create(range(5)).with_output_types(int)
+              | 'IsEven' >> beam.Map(is_even_as_key)
+              | 'Parity' >> beam.GroupByKey())
 
     assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
     self.p.run()
@@ -1152,13 +1148,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([1, 2, 3])
-       | ('to int' >> beam.FlatMap(lambda x: [int(x)])
+       | ('ToInt' >> beam.FlatMap(lambda x: [int(x)])
           .with_input_types(str).with_output_types(int)))
       self.p.run()
 
     self.assertStartswith(
         e.exception.message,
-        "Runtime type violation detected within ParDo(to int): "
+        "Runtime type violation detected within ParDo(ToInt): "
         "Type-hint for argument: 'x' violated. "
         "Expected an instance of <type 'str'>, "
         "instead found 1, an instance of <type 'int'>.")
@@ -1170,14 +1166,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
-       | ('add' >> beam.FlatMap(lambda (x, y): [x + y])
+       | ('Add' >> beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
       )
       self.p.run()
 
     self.assertStartswith(
         e.exception.message,
-        "Runtime type violation detected within ParDo(add): "
+        "Runtime type violation detected within ParDo(Add): "
         "Type-hint for argument: 'y' violated. "
         "Expected an instance of <type 'int'>, "
         "instead found 3.0, an instance of <type 'float'>.")
@@ -1189,14 +1185,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # The type-hinted applied via the 'returns()' method indicates the ParDo
     # should output an instance of type 'int', however a 'float' will be
     # generated instead.
-    print "HINTS", beam.FlatMap(
-        'to int',
+    print "HINTS", ('ToInt' >> beam.FlatMap(
         lambda x: [float(x)]).with_input_types(int).with_output_types(
-            int).get_type_hints()
+            int)).get_type_hints()
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([1, 2, 3])
-       | ('to int' >> beam.FlatMap(lambda x: [float(x)])
+       | ('ToInt' >> beam.FlatMap(lambda x: [float(x)])
           .with_input_types(int).with_output_types(int))
       )
       self.p.run()
@@ -1204,7 +1199,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(to int): "
+        "ParDo(ToInt): "
         "According to type-hint expected output should be "
         "of type <type 'int'>. Instead, received '1.0', "
         "an instance of type <type 'float'>.")
@@ -1219,7 +1214,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
-       | ('swap' >> beam.FlatMap(lambda (x, y): [x + y])
+       | ('Swap' >> beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, float])
           .with_output_types(typehints.Tuple[float, int]))
       )
@@ -1228,7 +1223,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(swap): Tuple type constraint violated. "
+        "ParDo(Swap): Tuple type constraint violated. "
         "Valid object instance must be of type 'tuple'. Instead, "
         "an instance of 'float' was received.")
 
@@ -1242,12 +1237,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return a + b
 
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0))
+      (self.p | beam.Create([1, 2, 3, 4]) | 'Add 1' >> beam.Map(add, 1.0))
       self.p.run()
 
     self.assertStartswith(
         e.exception.message,
-        "Runtime type violation detected within ParDo(add 1): "
+        "Runtime type violation detected within ParDo(Add 1): "
         "Type-hint for argument: 'b' violated. "
         "Expected an instance of <type 'int'>, "
         "instead found 1.0, an instance of <type 'float'>.")
@@ -1259,14 +1254,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([1, 2, 3, 4])
-       | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0)
+       | ('Add 1' >> beam.Map(lambda x, one: x + one, 1.0)
           .with_input_types(int, int)
           .with_output_types(float)))
       self.p.run()
 
     self.assertStartswith(
         e.exception.message,
-        "Runtime type violation detected within ParDo(add 1): "
+        "Runtime type violation detected within ParDo(Add 1): "
         "Type-hint for argument: 'one' violated. "
         "Expected an instance of <type 'int'>, "
         "instead found 1.0, an instance of <type 'float'>.")
@@ -1278,8 +1273,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return sum(ints)
 
     d = (self.p
-         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
-         | 'sum' >> beam.CombineGlobally(sum_ints))
+         | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'Sum' >> beam.CombineGlobally(sum_ints))
 
     self.assertEqual(int, d.element_type)
     assert_that(d, equal_to([6]))
@@ -1293,8 +1288,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'm' >> beam.Create([1, 2, 3]).with_output_types(int)
-       | 'add' >> beam.CombineGlobally(bad_combine))
+       | 'M' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'Add' >> beam.CombineGlobally(bad_combine))
 
     self.assertEqual(
         "All functions for a Combine PTransform must accept a "
@@ -1314,9 +1309,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return list(range(n+1))
 
     d = (self.p
-         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
-         | 'sum' >> beam.CombineGlobally(sum_ints)
-         | 'range' >> beam.ParDo(range_from_zero))
+         | 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'Sum' >> beam.CombineGlobally(sum_ints)
+         | 'Range' >> beam.ParDo(range_from_zero))
 
     self.assertEqual(int, d.element_type)
     assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6]))
@@ -1331,8 +1326,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return reduce(operator.mul, ints, 1)
 
     d = (self.p
-         | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
-         | 'mul' >> beam.CombineGlobally(iter_mul))
+         | 'K' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+         | 'Mul' >> beam.CombineGlobally(iter_mul))
 
     assert_that(d, equal_to([625]))
     self.p.run()
@@ -1349,14 +1344,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
-       | 'mul' >> beam.CombineGlobally(iter_mul))
+       | 'K' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+       | 'Mul' >> beam.CombineGlobally(iter_mul))
       self.p.run()
 
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
+        "ParDo(Mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
         "Tuple[TypeVariable[K], int] hint type-constraint violated. "
         "The type of element #1 in the passed tuple is incorrect. "
         "Expected an instance of type int, "
@@ -1381,7 +1376,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     d = (self.p
          | beam.Create(range(5)).with_output_types(int)
-         | ('sum' >> beam.CombineGlobally(lambda s: sum(s))
+         | ('Sum' >> beam.CombineGlobally(lambda s: sum(s))
             .with_input_types(int).with_output_types(int)))
 
     assert_that(d, equal_to([10]))
@@ -1391,10 +1386,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create(range(3)).with_output_types(int)
-       | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
 
-    self.assertEqual("Input type hint violation at sort join: "
+    self.assertEqual("Input type hint violation at SortJoin: "
                      "expected <type 'str'>, got <type 'int'>",
                      e.exception.message)
 
@@ -1405,14 +1400,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create(range(3)).with_output_types(int)
-       | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
       self.p.run()
 
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(sort join/KeyWithVoid): "
+        "ParDo(SortJoin/KeyWithVoid): "
         "Type-hint for argument: 'v' violated. "
         "Expected an instance of <type 'str'>, "
         "instead found 0, an instance of <type 'int'>.")
@@ -1422,20 +1417,20 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'e' >> beam.Create(range(3)).with_output_types(int)
-       | 'sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
-       | 'f' >> beam.Map(lambda x: x + 1))
+       | 'E' >> beam.Create(range(3)).with_output_types(int)
+       | 'SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | 'F' >> beam.Map(lambda x: x + 1))
 
     self.assertEqual(
         'Pipeline type checking is enabled, '
         'however no output type-hint was found for the PTransform '
-        'ParDo(sort join/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
+        'ParDo(SortJoin/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
         e.exception.message)
 
   def test_mean_globally_pipeline_checking_satisfied(self):
     d = (self.p
-         | 'c' >> beam.Create(range(5)).with_output_types(int)
-         | 'mean' >> combine.Mean.Globally())
+         | 'C' >> beam.Create(range(5)).with_output_types(int)
+         | 'Mean' >> combine.Mean.Globally())
 
     self.assertTrue(d.element_type is float)
     assert_that(d, equal_to([2.0]))
@@ -1444,8 +1439,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_mean_globally_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'c' >> beam.Create(['test']).with_output_types(str)
-       | 'mean' >> combine.Mean.Globally())
+       | 'C' >> beam.Create(['test']).with_output_types(str)
+       | 'Mean' >> combine.Mean.Globally())
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
                      "requires Tuple[TypeVariable[K], "
@@ -1457,8 +1452,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'c' >> beam.Create(range(5)).with_output_types(int)
-         | 'mean' >> combine.Mean.Globally())
+         | 'C' >> beam.Create(range(5)).with_output_types(int)
+         | 'Mean' >> combine.Mean.Globally())
 
     self.assertTrue(d.element_type is float)
     assert_that(d, equal_to([2.0]))
@@ -1470,8 +1465,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-       | 'mean' >> combine.Mean.Globally())
+       | 'C' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+       | 'Mean' >> combine.Mean.Globally())
       self.p.run()
       self.assertEqual("Runtime type violation detected for transform input "
                        "when executing ParDoFlatMap(Combine): Tuple[Any, "
@@ -1487,9 +1482,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_mean_per_key_pipeline_checking_satisfied(self):
     d = (self.p
          | beam.Create(range(5)).with_output_types(int)
-         | ('even group' >> beam.Map(lambda x: (not x % 2, x))
+         | ('EvenGroup' >> beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
-         | 'even mean' >> combine.Mean.PerKey())
+         | 'EvenMean' >> combine.Mean.PerKey())
 
     self.assertCompatible(typehints.KV[bool, float], d.element_type)
     assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1499,9 +1494,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create(map(str, range(5))).with_output_types(str)
-       | ('upper pair' >> beam.Map(lambda x: (x.upper(), x))
+       | ('UpperPair' >> beam.Map(lambda x: (x.upper(), x))
           .with_output_types(typehints.KV[str, str]))
-       | 'even mean' >> combine.Mean.PerKey())
+       | 'EvenMean' >> combine.Mean.PerKey())
       self.p.run()
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
@@ -1515,9 +1510,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     d = (self.p
          | beam.Create(range(5)).with_output_types(int)
-         | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x))
+         | ('OddGroup' >> beam.Map(lambda x: (bool(x % 2), x))
             .with_output_types(typehints.KV[bool, int]))
-         | 'odd mean' >> combine.Mean.PerKey())
+         | 'OddMean' >> combine.Mean.PerKey())
 
     self.assertCompatible(typehints.KV[bool, float], d.element_type)
     assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1530,15 +1525,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create(range(5)).with_output_types(int)
-       | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2))))
+       | ('OddGroup' >> beam.Map(lambda x: (x, str(bool(x % 2))))
           .with_output_types(typehints.KV[int, str]))
-       | 'odd mean' >> combine.Mean.PerKey())
+       | 'OddMean' >> combine.Mean.PerKey())
       self.p.run()
 
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(odd mean/CombinePerKey(MeanCombineFn)/"
+        "ParDo(OddMean/CombinePerKey(MeanCombineFn)/"
         "Combine/ParDo(CombineValuesDoFn)): "
         "Type-hint for argument: 'p_context' violated: "
         "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
@@ -1553,8 +1548,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_globally_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | 'p' >> beam.Create(range(5)).with_output_types(int)
-         | 'count int' >> combine.Count.Globally())
+         | 'P' >> beam.Create(range(5)).with_output_types(int)
+         | 'CountInt' >> combine.Count.Globally())
 
     self.assertTrue(d.element_type is int)
     assert_that(d, equal_to([5]))
@@ -1564,8 +1559,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | 'p' >> beam.Create(range(5)).with_output_types(int)
-         | 'count int' >> combine.Count.Globally())
+         | 'P' >> beam.Create(range(5)).with_output_types(int)
+         | 'CountInt' >> combine.Count.Globally())
 
     self.assertTrue(d.element_type is int)
     assert_that(d, equal_to([5]))
@@ -1574,9 +1569,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_count_perkey_pipeline_type_checking_satisfied(self):
     d = (self.p
          | beam.Create(range(5)).with_output_types(int)
-         | ('even group' >> beam.Map(lambda x: (not x % 2, x))
+         | ('EvenGroup' >> beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
-         | 'count int' >> combine.Count.PerKey())
+         | 'CountInt' >> combine.Count.PerKey())
 
     self.assertCompatible(typehints.KV[bool, int], d.element_type)
     assert_that(d, equal_to([(False, 2), (True, 3)]))
@@ -1586,7 +1581,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create(range(5)).with_output_types(int)
-       | 'count int' >> combine.Count.PerKey())
+       | 'CountInt' >> combine.Count.PerKey())
 
     self.assertEqual("Input type hint violation at GroupByKey: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1598,9 +1593,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     d = (self.p
          | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
-         | 'dup key' >> beam.Map(lambda x: (x, x))
+         | 'DupKey' >> beam.Map(lambda x: (x, x))
          .with_output_types(typehints.KV[str, str])
-         | 'count dups' >> combine.Count.PerKey())
+         | 'CountDups' >> combine.Count.PerKey())
 
     self.assertCompatible(typehints.KV[str, int], d.element_type)
     assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)]))
@@ -1609,7 +1604,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_count_perelement_pipeline_type_checking_satisfied(self):
     d = (self.p
          | beam.Create([1, 1, 2, 3]).with_output_types(int)
-         | 'count elems' >> combine.Count.PerElement())
+         | 'CountElems' >> combine.Count.PerElement())
 
     self.assertCompatible(typehints.KV[int, int], d.element_type)
     assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)]))
@@ -1621,7 +1616,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | 'f' >> beam.Create([1, 1, 2, 3])
-       | 'count elems' >> combine.Count.PerElement())
+       | 'CountElems' >> combine.Count.PerElement())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -1634,7 +1629,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     d = (self.p
          | beam.Create([True, True, False, True, True])
          .with_output_types(bool)
-         | 'count elems' >> combine.Count.PerElement())
+         | 'CountElems' >> combine.Count.PerElement())
 
     self.assertCompatible(typehints.KV[bool, int], d.element_type)
     assert_that(d, equal_to([(False, 1), (True, 4)]))
@@ -1643,7 +1638,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_top_of_pipeline_checking_satisfied(self):
     d = (self.p
          | beam.Create(range(5, 11)).with_output_types(int)
-         | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
+         | 'Top 3' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[int],
                           d.element_type)
@@ -1655,7 +1650,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     d = (self.p
          | beam.Create(list('testing')).with_output_types(str)
-         | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
+         | 'AciiTop' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[str], d.element_type)
     assert_that(d, equal_to([['t', 't', 's']]))
@@ -1665,8 +1660,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create(range(100)).with_output_types(int)
-       | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
-       | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
+       | 'Num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
+       | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertEqual("Input type hint violation at GroupByKey: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1676,9 +1671,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_per_key_pipeline_checking_satisfied(self):
     d = (self.p
          | beam.Create(range(100)).with_output_types(int)
-         | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
+         | ('GroupMod 3' >> beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
-         | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
+         | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1690,9 +1685,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     d = (self.p
          | beam.Create(range(21))
-         | ('group mod 3' >> beam.Map(lambda x: (x % 3, x))
+         | ('GroupMod 3' >> beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
-         | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
+         | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1702,7 +1697,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_sample_globally_pipeline_satisfied(self):
     d = (self.p
          | beam.Create([2, 2, 3, 3]).with_output_types(int)
-         | 'sample' >> combine.Sample.FixedSizeGlobally(3))
+         | 'Sample' >> combine.Sample.FixedSizeGlobally(3))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
 
@@ -1718,7 +1713,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     d = (self.p
          | beam.Create([2, 2, 3, 3]).with_output_types(int)
-         | 'sample' >> combine.Sample.FixedSizeGlobally(2))
+         | 'Sample' >> combine.Sample.FixedSizeGlobally(2))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
 
@@ -1733,7 +1728,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     d = (self.p
          | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
-         | 'sample' >> combine.Sample.FixedSizePerKey(2))
+         | 'Sample' >> combine.Sample.FixedSizePerKey(2))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1752,7 +1747,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     d = (self.p
          | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
-         | 'sample' >> combine.Sample.FixedSizePerKey(1))
+         | 'Sample' >> combine.Sample.FixedSizePerKey(1))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1835,13 +1830,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(TypeError) as e:
       (self.p
        | beam.Create([1, 2, 3]).with_output_types(int)
-       | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
+       | 'Len' >> beam.Map(lambda x: len(x)).with_output_types(int))
       self.p.run()
 
     # Our special type-checking related TypeError shouldn't have been raised.
     # Instead the above pipeline should have triggered a regular Python runtime
     # TypeError.
-    self.assertEqual("object of type 'int' has no len() [while running 'len']",
+    self.assertEqual("object of type 'int' has no len() [while running 'Len']",
                      e.exception.message)
     self.assertFalse(isinstance(e, typehints.TypeCheckError))
 
@@ -1869,7 +1864,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       _ = (self.p
            | beam.Create(['a', 'b', 'c'])
-           | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
+           | 'Ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
            | beam.GroupByKey())
 
     self.assertEqual('Input type hint violation at GroupByKey: '
@@ -1879,11 +1874,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_type_inference_command_line_flag_toggle(self):
     self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    x = self.p | 'c1' >> beam.Create([1, 2, 3, 4])
+    x = self.p | 'C1' >> beam.Create([1, 2, 3, 4])
     self.assertIsNone(x.element_type)
 
     self.p.options.view_as(TypeOptions).pipeline_type_check = True
-    x = self.p | 'c2' >> beam.Create([1, 2, 3, 4])
+    x = self.p | 'C2' >> beam.Create([1, 2, 3, 4])
     self.assertEqual(int, x.element_type)