You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/18 20:22:53 UTC

[2/2] beam git commit: Rename SideOutputValue to OutputValue

Rename SideOutputValue to OutputValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3418863f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3418863f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3418863f

Branch: refs/heads/master
Commit: 3418863fce834c458fb13fc82baa1c7c660030ef
Parents: 22d84c9
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 17 17:59:10 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 18 13:22:25 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/multiple_output_pardo.py  | 47 ++++++++++----------
 .../examples/snippets/snippets_test.py          | 30 ++++++-------
 sdks/python/apache_beam/pvalue.py               | 10 ++---
 sdks/python/apache_beam/runners/common.pxd      |  2 +-
 sdks/python/apache_beam/runners/common.py       |  6 +--
 .../runners/dataflow/dataflow_runner.py         |  8 ++--
 .../consumer_tracking_pipeline_visitor_test.py  |  2 +-
 .../runners/direct/transform_evaluator.py       | 12 ++---
 sdks/python/apache_beam/runners/runner.py       |  4 +-
 sdks/python/apache_beam/transforms/core.py      |  8 ++--
 .../apache_beam/transforms/ptransform_test.py   | 18 ++++----
 sdks/python/apache_beam/typehints/typecheck.py  |  4 +-
 12 files changed, 76 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 978e4ed..b324ed1 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -18,9 +18,9 @@
 
 """A workflow demonstrating a DoFn with multiple outputs.
 
-DoFns may produce a main output and additional side outputs. These side outputs
-are marked with a tag at output time and later the same tag will be used to get
-the corresponding result (a PCollection) for that side output.
+DoFns may produce multiple outputs. Outputs that are not the default ("main")
+output are marked with a tag at output time and later the same tag will be used
+to get the corresponding result (a PCollection) for that output.
 
 This is a slightly modified version of the basic wordcount example. In this
 example words are divided into 2 buckets as shorts words (3 characters in length
@@ -68,43 +68,44 @@ class SplitLinesToWordsFn(beam.DoFn):
 
   This transform will have 3 outputs:
     - main output: all words that are longer than 3 characters.
-    - short words side output: all other words.
-    - character count side output: Number of characters in each processed line.
+    - short words output: all other words.
+    - character count output: Number of characters in each processed line.
   """
 
-  # These tags will be used to tag the side outputs of this DoFn.
-  SIDE_OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
-  SIDE_OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
+  # These tags will be used to tag the outputs of this DoFn.
+  OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
+  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
 
   def process(self, element):
-    """Receives a single element (a line) and produces words and side outputs.
+    """Receives a single element (a line) and produces words and character
+    counts.
 
     Important things to note here:
       - For a single element you may produce multiple main outputs:
         words of a single line.
-      - For that same input you may produce multiple side outputs, along with
-        multiple main outputs.
-      - Side outputs may have different types (count) or may share the same type
+      - For that same input you may produce multiple outputs, potentially
+        across multiple PCollections
+      - Outputs may have different types (count) or may share the same type
         (words) as with the main output.
 
     Args:
       element: processing element.
 
     Yields:
-      words as main output, short words as side output, line character count as
-      side output.
+      words as main output, short words as tagged output, line character count
+      as tagged output.
     """
-    # yield a count (integer) to the SIDE_OUTPUT_TAG_CHARACTER_COUNT tagged
+    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
     # collection.
-    yield pvalue.SideOutputValue(self.SIDE_OUTPUT_TAG_CHARACTER_COUNT,
-                                 len(element))
+    yield pvalue.OutputValue(self.OUTPUT_TAG_CHARACTER_COUNT,
+                             len(element))
 
     words = re.findall(r'[A-Za-z\']+', element)
     for word in words:
       if len(word) <= 3:
-        # yield word as a side output to the SIDE_OUTPUT_TAG_SHORT_WORDS tagged
+        # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged
         # collection.
-        yield pvalue.SideOutputValue(self.SIDE_OUTPUT_TAG_SHORT_WORDS, word)
+        yield pvalue.OutputValue(self.OUTPUT_TAG_SHORT_WORDS, word)
       else:
         # yield word to add it to the main collection.
         yield word
@@ -144,18 +145,18 @@ def run(argv=None):
 
   lines = p | ReadFromText(known_args.input)
 
-  # with_outputs allows accessing the side outputs of a DoFn.
+  # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
   split_lines_result = (lines
                         | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-                            SplitLinesToWordsFn.SIDE_OUTPUT_TAG_SHORT_WORDS,
-                            SplitLinesToWordsFn.SIDE_OUTPUT_TAG_CHARACTER_COUNT,
+                            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+                            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
                             main='words'))
 
   # split_lines_result is an object of type DoOutputsTuple. It supports
   # accessing result in alternative ways.
   words, _, _ = split_lines_result
   short_words = split_lines_result[
-      SplitLinesToWordsFn.SIDE_OUTPUT_TAG_SHORT_WORDS]
+      SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
   character_count = split_lines_result.tag_character_count
 
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index c3984bb..2aee350 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -176,8 +176,8 @@ class ParDoTest(unittest.TestCase):
     # [END model_pardo_side_input_dofn]
     self.assertEqual({'a', 'bb', 'ccc'}, set(small_words))
 
-  def test_pardo_with_side_outputs(self):
-    # [START model_pardo_emitting_values_on_side_outputs]
+  def test_pardo_with_tagged_outputs(self):
+    # [START model_pardo_emitting_values_on_tagged_outputs]
     class ProcessWords(beam.DoFn):
 
       def process(self, element, cutoff_length, marker):
@@ -185,48 +185,48 @@ class ParDoTest(unittest.TestCase):
           # Emit this short word to the main output.
           yield element
         else:
-          # Emit this word's long length to a side output.
-          yield pvalue.SideOutputValue(
+          # Emit this word's long length to the 'above_cutoff_lengths' output.
+          yield pvalue.OutputValue(
               'above_cutoff_lengths', len(element))
         if element.startswith(marker):
-          # Emit this word to a different side output.
-          yield pvalue.SideOutputValue('marked strings', element)
-    # [END model_pardo_emitting_values_on_side_outputs]
+          # Emit this word to a different output with the 'marked strings' tag.
+          yield pvalue.OutputValue('marked strings', element)
+    # [END model_pardo_emitting_values_on_tagged_outputs]
 
     words = ['a', 'an', 'the', 'music', 'xyz']
 
-    # [START model_pardo_with_side_outputs]
+    # [START model_pardo_with_tagged_outputs]
     results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
                .with_outputs('above_cutoff_lengths', 'marked strings',
                              main='below_cutoff_strings'))
     below = results.below_cutoff_strings
     above = results.above_cutoff_lengths
     marked = results['marked strings']  # indexing works as well
-    # [END model_pardo_with_side_outputs]
+    # [END model_pardo_with_tagged_outputs]
 
     self.assertEqual({'a', 'an'}, set(below))
     self.assertEqual({3, 5}, set(above))
     self.assertEqual({'xyz'}, set(marked))
 
-    # [START model_pardo_with_side_outputs_iter]
+    # [START model_pardo_with_tagged_outputs_iter]
     below, above, marked = (words
                             | beam.ParDo(
                                 ProcessWords(), cutoff_length=2, marker='x')
                             .with_outputs('above_cutoff_lengths',
                                           'marked strings',
                                           main='below_cutoff_strings'))
-    # [END model_pardo_with_side_outputs_iter]
+    # [END model_pardo_with_tagged_outputs_iter]
 
     self.assertEqual({'a', 'an'}, set(below))
     self.assertEqual({3, 5}, set(above))
     self.assertEqual({'xyz'}, set(marked))
 
-  def test_pardo_with_undeclared_side_outputs(self):
+  def test_pardo_with_undeclared_outputs(self):
     numbers = [1, 2, 3, 4, 5, 10, 20]
 
-    # [START model_pardo_with_side_outputs_undeclared]
+    # [START model_pardo_with_undeclared_outputs]
     def even_odd(x):
-      yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
+      yield pvalue.OutputValue('odd' if x % 2 else 'even', x)
       if x % 10 == 0:
         yield x
 
@@ -235,7 +235,7 @@ class ParDoTest(unittest.TestCase):
     evens = results.even
     odds = results.odd
     tens = results[None]  # the undeclared main output
-    # [END model_pardo_with_side_outputs_undeclared]
+    # [END model_pardo_with_undeclared_outputs]
 
     self.assertEqual({2, 4, 10, 20}, set(evens))
     self.assertEqual({1, 3, 5}, set(odds))

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index 5709b38..d873669 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -206,14 +206,14 @@ class DoOutputsTuple(object):
     elif self._tags and tag not in self._tags:
       raise ValueError(
           "Tag '%s' is neither the main tag '%s' "
-          "nor any of the side tags %s" % (
+          "nor any of the tags %s" % (
               tag, self._main_tag, self._tags))
     # Check if we accessed this tag before.
     if tag in self._pcolls:
       return self._pcolls[tag]
 
     if tag is not None:
-      self._transform.side_output_tags.add(tag)
+      self._transform.output_tags.add(tag)
       pcoll = PCollection(self._pipeline, tag=tag)
       # Transfer the producer from the DoOutputsTuple to the resulting
       # PCollection.
@@ -230,19 +230,19 @@ class DoOutputsTuple(object):
     return pcoll
 
 
-class SideOutputValue(object):
+class OutputValue(object):
   """An object representing a tagged value.
 
   ParDo, Map, and FlatMap transforms can emit values on multiple outputs which
   are distinguished by string tags. The DoFn will return plain values
-  if it wants to emit on the main output and SideOutputValue objects
+  if it wants to emit on the main output and OutputValue objects
   if it wants to emit a value on a specific tagged output.
   """
 
   def __init__(self, tag, value):
     if not isinstance(tag, basestring):
       raise TypeError(
-          'Attempting to create a SideOutputValue with non-string tag %s' % tag)
+          'Attempting to create a OutputValue with non-string tag %s' % tag)
     self.tag = tag
     self.value = value
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 781d96b..5952942 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -21,7 +21,7 @@ from apache_beam.utils.windowed_value cimport WindowedValue
 from apache_beam.metrics.execution cimport ScopedMetricsContainer
 
 
-cdef type SideOutputValue, TimestampedValue
+cdef type OutputValue, TimestampedValue
 
 
 cdef class Receiver(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 8f86b75..64d6d00 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -23,7 +23,7 @@ import sys
 
 from apache_beam.internal import util
 from apache_beam.metrics.execution import ScopedMetricsContainer
-from apache_beam.pvalue import SideOutputValue
+from apache_beam.pvalue import OutputValue
 from apache_beam.transforms import core
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowFn
@@ -283,14 +283,14 @@ class DoFnRunner(Receiver):
   def _process_outputs(self, windowed_input_element, results):
     """Dispatch the result of computation to the appropriate receivers.
 
-    A value wrapped in a SideOutputValue object will be unwrapped and
+    A value wrapped in a OutputValue object will be unwrapped and
     then dispatched to the appropriate indexed output.
     """
     if results is None:
       return
     for result in results:
       tag = None
-      if isinstance(result, SideOutputValue):
+      if isinstance(result, OutputValue):
         tag = result.tag
         if not isinstance(tag, basestring):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 2e9fc52..bdbd2dd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -373,7 +373,7 @@ class DataflowRunner(PipelineRunner):
         transform_node.full_label + (
             '/Do' if transform_node.side_inputs else ''),
         transform_node,
-        transform_node.transform.side_output_tags)
+        transform_node.transform.output_tags)
     fn_data = self._pardo_fn_data(transform_node, lookup_label)
     step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))
     step.add_property(
@@ -384,7 +384,7 @@ class DataflowRunner(PipelineRunner):
     # Add side inputs if any.
     step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict)
 
-    # Generate description for main output and side outputs. The output names
+    # Generate description for the outputs. The output names
     # will be 'out' for main output and 'out_<tag>' for a tagged output.
     # Using 'out' as a tag will not clash with the name for main since it will
     # be transformed into 'out_out' internally.
@@ -397,8 +397,8 @@ class DataflowRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
          PropertyNames.ENCODING: step.encoding,
          PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
-    for side_tag in transform.side_output_tags:
-      # The assumption here is that side outputs will have the same typehint
+    for side_tag in transform.output_tags:
+      # The assumption here is that all outputs will have the same typehint
       # and coder as the main output. This is certainly the case right now
       # but conceivably it could change in the future.
       outputs.append(

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 154284b..3ed553e 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -76,7 +76,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
 
       def process(self, element):
         if element < 0:
-          yield pvalue.SideOutputValue('tag_negative', element)
+          yield pvalue.OutputValue('tag_negative', element)
         else:
           yield element
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index f34513c..16b3131 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -248,13 +248,13 @@ class _TaggedReceivers(dict):
     return self._undeclared_in_memory_tag_values
 
   class NullReceiver(object):
-    """Ignores undeclared side outputs, default execution mode."""
+    """Ignores undeclared outputs, default execution mode."""
 
     def output(self, element):
       pass
 
   class InMemoryReceiver(object):
-    """Buffers undeclared side outputs to the given dictionary."""
+    """Buffers undeclared outputs to the given dictionary."""
 
     def __init__(self, target, tag):
       self._target = target
@@ -282,12 +282,12 @@ class _ParDoEvaluator(_TransformEvaluator):
     transform = self._applied_ptransform.transform
 
     self._tagged_receivers = _TaggedReceivers(self._evaluation_context)
-    for side_output_tag in self._applied_ptransform.outputs:
-      output_pcollection = pvalue.PCollection(None, tag=side_output_tag)
+    for output_tag in self._applied_ptransform.outputs:
+      output_pcollection = pvalue.PCollection(None, tag=output_tag)
       output_pcollection.producer = self._applied_ptransform
-      self._tagged_receivers[side_output_tag] = (
+      self._tagged_receivers[output_tag] = (
           self._evaluation_context.create_bundle(output_pcollection))
-      self._tagged_receivers[side_output_tag].tag = side_output_tag
+      self._tagged_receivers[output_tag].tag = output_tag
 
     self._counter_factory = counters.CounterFactory()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 6c05951..4d33802 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -325,8 +325,8 @@ class PValueCache(object):
     except KeyError:
       if (pvalue.tag is not None
           and self.to_cache_key(pvalue.real_producer, None) in self._cache):
-        # This is an undeclared, empty side output of a DoFn executed
-        # in the local runner before this side output referenced.
+        # This is an undeclared, empty output of a DoFn executed
+        # in the local runner before this output was referenced.
         return []
       else:
         raise

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 3def9ef..bdfddbb 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -184,7 +184,7 @@ class DoFn(WithTypeHints, HasDisplayData):
         trivial_inference.infer_return_type(self.process, [input_type]))
 
   def _strip_output_annotations(self, type_hint):
-    annotations = (TimestampedValue, WindowedValue, pvalue.SideOutputValue)
+    annotations = (TimestampedValue, WindowedValue, pvalue.OutputValue)
     # TODO(robertwb): These should be parameterized types that the
     # type inferencer understands.
     if (type_hint in annotations
@@ -614,7 +614,7 @@ class ParDo(PTransformWithSideInputs):
             'fn_dd': self.fn}
 
   def expand(self, pcoll):
-    self.side_output_tags = set()
+    self.output_tags = set()
     # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
     self.dofn = self.fn
     return pvalue.PCollection(pcoll.pipeline)
@@ -1154,9 +1154,9 @@ class Partition(PTransformWithSideInputs):
         raise ValueError(
             'PartitionFn specified out-of-bounds partition index: '
             '%d not in [0, %d)' % (partition, n))
-      # Each input is directed into the side output that corresponds to the
+      # Each input is directed into the output that corresponds to the
       # selected partition.
-      yield pvalue.SideOutputValue(str(partition), element)
+      yield pvalue.OutputValue(str(partition), element)
 
   def make_fn(self, fn):
     return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn)

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 4da5443..b92af83 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -159,7 +159,7 @@ class PTransformTest(unittest.TestCase):
                              'is discouraged.')
     self.assertStartswith(cm.exception.message, expected_error_prefix)
 
-  def test_do_with_side_outputs_maintains_unique_name(self):
+  def test_do_with_multiple_outputs_maintains_unique_name(self):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
     r1 = pcoll | 'A' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
@@ -176,9 +176,9 @@ class PTransformTest(unittest.TestCase):
       def process(self, element):
         yield element
         if element % 2 == 0:
-          yield pvalue.SideOutputValue('even', element)
+          yield pvalue.OutputValue('even', element)
         else:
-          yield pvalue.SideOutputValue('odd', element)
+          yield pvalue.OutputValue('odd', element)
 
     pipeline = TestPipeline()
     nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
@@ -193,8 +193,8 @@ class PTransformTest(unittest.TestCase):
   def test_par_do_with_multiple_outputs_and_using_return(self):
     def some_fn(v):
       if v % 2 == 0:
-        return [v, pvalue.SideOutputValue('even', v)]
-      return [v, pvalue.SideOutputValue('odd', v)]
+        return [v, pvalue.OutputValue('even', v)]
+      return [v, pvalue.OutputValue('odd', v)]
 
     pipeline = TestPipeline()
     nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
@@ -206,12 +206,12 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   @attr('ValidatesRunner')
-  def test_undeclared_side_outputs(self):
+  def test_undeclared_outputs(self):
     pipeline = TestPipeline()
     nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
     results = nums | 'ClassifyNumbers' >> beam.FlatMap(
         lambda x: [x,
-                   pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+                   pvalue.OutputValue('even' if x % 2 == 0 else 'odd', x)]
     ).with_outputs()
     assert_that(results[None], equal_to([1, 2, 3, 4]))
     assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
@@ -219,12 +219,12 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   @attr('ValidatesRunner')
-  def test_empty_side_outputs(self):
+  def test_multiple_empty_outputs(self):
     pipeline = TestPipeline()
     nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5])
     results = nums | 'ClassifyNumbers' >> beam.FlatMap(
         lambda x: [x,
-                   pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+                   pvalue.OutputValue('even' if x % 2 == 0 else 'odd', x)]
     ).with_outputs()
     assert_that(results[None], equal_to([1, 3, 5]))
     assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')

http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index 2e7176e..e475d9d 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -22,7 +22,7 @@ import inspect
 import sys
 import types
 
-from apache_beam.pvalue import SideOutputValue
+from apache_beam.pvalue import OutputValue
 from apache_beam.transforms.core import DoFn
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.typehints import check_constraint
@@ -136,7 +136,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
 
     def type_check_output(o):
       # TODO(robertwb): Multi-output.
-      x = o.value if isinstance(o, (SideOutputValue, WindowedValue)) else o
+      x = o.value if isinstance(o, (OutputValue, WindowedValue)) else o
       self._type_check(self._output_type_hint, x, is_input=False)
 
     # If the return type is a generator, then we will need to interleave our