You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/18 20:22:53 UTC
[2/2] beam git commit: Rename SideOutputValue to OutputValue
Rename SideOutputValue to OutputValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3418863f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3418863f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3418863f
Branch: refs/heads/master
Commit: 3418863fce834c458fb13fc82baa1c7c660030ef
Parents: 22d84c9
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 17 17:59:10 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 18 13:22:25 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/multiple_output_pardo.py | 47 ++++++++++----------
.../examples/snippets/snippets_test.py | 30 ++++++-------
sdks/python/apache_beam/pvalue.py | 10 ++---
sdks/python/apache_beam/runners/common.pxd | 2 +-
sdks/python/apache_beam/runners/common.py | 6 +--
.../runners/dataflow/dataflow_runner.py | 8 ++--
.../consumer_tracking_pipeline_visitor_test.py | 2 +-
.../runners/direct/transform_evaluator.py | 12 ++---
sdks/python/apache_beam/runners/runner.py | 4 +-
sdks/python/apache_beam/transforms/core.py | 8 ++--
.../apache_beam/transforms/ptransform_test.py | 18 ++++----
sdks/python/apache_beam/typehints/typecheck.py | 4 +-
12 files changed, 76 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 978e4ed..b324ed1 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -18,9 +18,9 @@
"""A workflow demonstrating a DoFn with multiple outputs.
-DoFns may produce a main output and additional side outputs. These side outputs
-are marked with a tag at output time and later the same tag will be used to get
-the corresponding result (a PCollection) for that side output.
+DoFns may produce multiple outputs. Outputs that are not the default ("main")
+output are marked with a tag at output time and later the same tag will be used
+to get the corresponding result (a PCollection) for that output.
This is a slightly modified version of the basic wordcount example. In this
example words are divided into 2 buckets as shorts words (3 characters in length
@@ -68,43 +68,44 @@ class SplitLinesToWordsFn(beam.DoFn):
This transform will have 3 outputs:
- main output: all words that are longer than 3 characters.
- - short words side output: all other words.
- - character count side output: Number of characters in each processed line.
+ - short words output: all other words.
+ - character count output: Number of characters in each processed line.
"""
- # These tags will be used to tag the side outputs of this DoFn.
- SIDE_OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
- SIDE_OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
+ # These tags will be used to tag the outputs of this DoFn.
+ OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
+ OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
def process(self, element):
- """Receives a single element (a line) and produces words and side outputs.
+ """Receives a single element (a line) and produces words and character
+ counts.
Important things to note here:
- For a single element you may produce multiple main outputs:
words of a single line.
- - For that same input you may produce multiple side outputs, along with
- multiple main outputs.
- - Side outputs may have different types (count) or may share the same type
+ - For that same input you may produce multiple outputs, potentially
+ across multiple PCollections
+ - Outputs may have different types (count) or may share the same type
(words) as with the main output.
Args:
element: processing element.
Yields:
- words as main output, short words as side output, line character count as
- side output.
+ words as main output, short words as tagged output, line character count
+ as tagged output.
"""
- # yield a count (integer) to the SIDE_OUTPUT_TAG_CHARACTER_COUNT tagged
+ # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
# collection.
- yield pvalue.SideOutputValue(self.SIDE_OUTPUT_TAG_CHARACTER_COUNT,
- len(element))
+ yield pvalue.OutputValue(self.OUTPUT_TAG_CHARACTER_COUNT,
+ len(element))
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
if len(word) <= 3:
- # yield word as a side output to the SIDE_OUTPUT_TAG_SHORT_WORDS tagged
+ # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged
# collection.
- yield pvalue.SideOutputValue(self.SIDE_OUTPUT_TAG_SHORT_WORDS, word)
+ yield pvalue.OutputValue(self.OUTPUT_TAG_SHORT_WORDS, word)
else:
# yield word to add it to the main collection.
yield word
@@ -144,18 +145,18 @@ def run(argv=None):
lines = p | ReadFromText(known_args.input)
- # with_outputs allows accessing the side outputs of a DoFn.
+ # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
split_lines_result = (lines
| beam.ParDo(SplitLinesToWordsFn()).with_outputs(
- SplitLinesToWordsFn.SIDE_OUTPUT_TAG_SHORT_WORDS,
- SplitLinesToWordsFn.SIDE_OUTPUT_TAG_CHARACTER_COUNT,
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+ SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
main='words'))
# split_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
words, _, _ = split_lines_result
short_words = split_lines_result[
- SplitLinesToWordsFn.SIDE_OUTPUT_TAG_SHORT_WORDS]
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
character_count = split_lines_result.tag_character_count
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index c3984bb..2aee350 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -176,8 +176,8 @@ class ParDoTest(unittest.TestCase):
# [END model_pardo_side_input_dofn]
self.assertEqual({'a', 'bb', 'ccc'}, set(small_words))
- def test_pardo_with_side_outputs(self):
- # [START model_pardo_emitting_values_on_side_outputs]
+ def test_pardo_with_tagged_outputs(self):
+ # [START model_pardo_emitting_values_on_tagged_outputs]
class ProcessWords(beam.DoFn):
def process(self, element, cutoff_length, marker):
@@ -185,48 +185,48 @@ class ParDoTest(unittest.TestCase):
# Emit this short word to the main output.
yield element
else:
- # Emit this word's long length to a side output.
- yield pvalue.SideOutputValue(
+ # Emit this word's long length to the 'above_cutoff_lengths' output.
+ yield pvalue.OutputValue(
'above_cutoff_lengths', len(element))
if element.startswith(marker):
- # Emit this word to a different side output.
- yield pvalue.SideOutputValue('marked strings', element)
- # [END model_pardo_emitting_values_on_side_outputs]
+ # Emit this word to a different output with the 'marked strings' tag.
+ yield pvalue.OutputValue('marked strings', element)
+ # [END model_pardo_emitting_values_on_tagged_outputs]
words = ['a', 'an', 'the', 'music', 'xyz']
- # [START model_pardo_with_side_outputs]
+ # [START model_pardo_with_tagged_outputs]
results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths', 'marked strings',
main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings'] # indexing works as well
- # [END model_pardo_with_side_outputs]
+ # [END model_pardo_with_tagged_outputs]
self.assertEqual({'a', 'an'}, set(below))
self.assertEqual({3, 5}, set(above))
self.assertEqual({'xyz'}, set(marked))
- # [START model_pardo_with_side_outputs_iter]
+ # [START model_pardo_with_tagged_outputs_iter]
below, above, marked = (words
| beam.ParDo(
ProcessWords(), cutoff_length=2, marker='x')
.with_outputs('above_cutoff_lengths',
'marked strings',
main='below_cutoff_strings'))
- # [END model_pardo_with_side_outputs_iter]
+ # [END model_pardo_with_tagged_outputs_iter]
self.assertEqual({'a', 'an'}, set(below))
self.assertEqual({3, 5}, set(above))
self.assertEqual({'xyz'}, set(marked))
- def test_pardo_with_undeclared_side_outputs(self):
+ def test_pardo_with_undeclared_outputs(self):
numbers = [1, 2, 3, 4, 5, 10, 20]
- # [START model_pardo_with_side_outputs_undeclared]
+ # [START model_pardo_with_undeclared_outputs]
def even_odd(x):
- yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x)
+ yield pvalue.OutputValue('odd' if x % 2 else 'even', x)
if x % 10 == 0:
yield x
@@ -235,7 +235,7 @@ class ParDoTest(unittest.TestCase):
evens = results.even
odds = results.odd
tens = results[None] # the undeclared main output
- # [END model_pardo_with_side_outputs_undeclared]
+ # [END model_pardo_with_undeclared_outputs]
self.assertEqual({2, 4, 10, 20}, set(evens))
self.assertEqual({1, 3, 5}, set(odds))
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index 5709b38..d873669 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -206,14 +206,14 @@ class DoOutputsTuple(object):
elif self._tags and tag not in self._tags:
raise ValueError(
"Tag '%s' is neither the main tag '%s' "
- "nor any of the side tags %s" % (
+ "nor any of the tags %s" % (
tag, self._main_tag, self._tags))
# Check if we accessed this tag before.
if tag in self._pcolls:
return self._pcolls[tag]
if tag is not None:
- self._transform.side_output_tags.add(tag)
+ self._transform.output_tags.add(tag)
pcoll = PCollection(self._pipeline, tag=tag)
# Transfer the producer from the DoOutputsTuple to the resulting
# PCollection.
@@ -230,19 +230,19 @@ class DoOutputsTuple(object):
return pcoll
-class SideOutputValue(object):
+class OutputValue(object):
"""An object representing a tagged value.
ParDo, Map, and FlatMap transforms can emit values on multiple outputs which
are distinguished by string tags. The DoFn will return plain values
- if it wants to emit on the main output and SideOutputValue objects
+ if it wants to emit on the main output and OutputValue objects
if it wants to emit a value on a specific tagged output.
"""
def __init__(self, tag, value):
if not isinstance(tag, basestring):
raise TypeError(
- 'Attempting to create a SideOutputValue with non-string tag %s' % tag)
+ 'Attempting to create a OutputValue with non-string tag %s' % tag)
self.tag = tag
self.value = value
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 781d96b..5952942 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -21,7 +21,7 @@ from apache_beam.utils.windowed_value cimport WindowedValue
from apache_beam.metrics.execution cimport ScopedMetricsContainer
-cdef type SideOutputValue, TimestampedValue
+cdef type OutputValue, TimestampedValue
cdef class Receiver(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 8f86b75..64d6d00 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -23,7 +23,7 @@ import sys
from apache_beam.internal import util
from apache_beam.metrics.execution import ScopedMetricsContainer
-from apache_beam.pvalue import SideOutputValue
+from apache_beam.pvalue import OutputValue
from apache_beam.transforms import core
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowFn
@@ -283,14 +283,14 @@ class DoFnRunner(Receiver):
def _process_outputs(self, windowed_input_element, results):
"""Dispatch the result of computation to the appropriate receivers.
- A value wrapped in a SideOutputValue object will be unwrapped and
+ A value wrapped in a OutputValue object will be unwrapped and
then dispatched to the appropriate indexed output.
"""
if results is None:
return
for result in results:
tag = None
- if isinstance(result, SideOutputValue):
+ if isinstance(result, OutputValue):
tag = result.tag
if not isinstance(tag, basestring):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 2e9fc52..bdbd2dd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -373,7 +373,7 @@ class DataflowRunner(PipelineRunner):
transform_node.full_label + (
'/Do' if transform_node.side_inputs else ''),
transform_node,
- transform_node.transform.side_output_tags)
+ transform_node.transform.output_tags)
fn_data = self._pardo_fn_data(transform_node, lookup_label)
step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))
step.add_property(
@@ -384,7 +384,7 @@ class DataflowRunner(PipelineRunner):
# Add side inputs if any.
step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict)
- # Generate description for main output and side outputs. The output names
+ # Generate description for the outputs. The output names
# will be 'out' for main output and 'out_<tag>' for a tagged output.
# Using 'out' as a tag will not clash with the name for main since it will
# be transformed into 'out_out' internally.
@@ -397,8 +397,8 @@ class DataflowRunner(PipelineRunner):
'%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
- for side_tag in transform.side_output_tags:
- # The assumption here is that side outputs will have the same typehint
+ for side_tag in transform.output_tags:
+ # The assumption here is that all outputs will have the same typehint
# and coder as the main output. This is certainly the case right now
# but conceivably it could change in the future.
outputs.append(
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 154284b..3ed553e 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -76,7 +76,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
def process(self, element):
if element < 0:
- yield pvalue.SideOutputValue('tag_negative', element)
+ yield pvalue.OutputValue('tag_negative', element)
else:
yield element
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index f34513c..16b3131 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -248,13 +248,13 @@ class _TaggedReceivers(dict):
return self._undeclared_in_memory_tag_values
class NullReceiver(object):
- """Ignores undeclared side outputs, default execution mode."""
+ """Ignores undeclared outputs, default execution mode."""
def output(self, element):
pass
class InMemoryReceiver(object):
- """Buffers undeclared side outputs to the given dictionary."""
+ """Buffers undeclared outputs to the given dictionary."""
def __init__(self, target, tag):
self._target = target
@@ -282,12 +282,12 @@ class _ParDoEvaluator(_TransformEvaluator):
transform = self._applied_ptransform.transform
self._tagged_receivers = _TaggedReceivers(self._evaluation_context)
- for side_output_tag in self._applied_ptransform.outputs:
- output_pcollection = pvalue.PCollection(None, tag=side_output_tag)
+ for output_tag in self._applied_ptransform.outputs:
+ output_pcollection = pvalue.PCollection(None, tag=output_tag)
output_pcollection.producer = self._applied_ptransform
- self._tagged_receivers[side_output_tag] = (
+ self._tagged_receivers[output_tag] = (
self._evaluation_context.create_bundle(output_pcollection))
- self._tagged_receivers[side_output_tag].tag = side_output_tag
+ self._tagged_receivers[output_tag].tag = output_tag
self._counter_factory = counters.CounterFactory()
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 6c05951..4d33802 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -325,8 +325,8 @@ class PValueCache(object):
except KeyError:
if (pvalue.tag is not None
and self.to_cache_key(pvalue.real_producer, None) in self._cache):
- # This is an undeclared, empty side output of a DoFn executed
- # in the local runner before this side output referenced.
+ # This is an undeclared, empty output of a DoFn executed
+ # in the local runner before this output was referenced.
return []
else:
raise
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 3def9ef..bdfddbb 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -184,7 +184,7 @@ class DoFn(WithTypeHints, HasDisplayData):
trivial_inference.infer_return_type(self.process, [input_type]))
def _strip_output_annotations(self, type_hint):
- annotations = (TimestampedValue, WindowedValue, pvalue.SideOutputValue)
+ annotations = (TimestampedValue, WindowedValue, pvalue.OutputValue)
# TODO(robertwb): These should be parameterized types that the
# type inferencer understands.
if (type_hint in annotations
@@ -614,7 +614,7 @@ class ParDo(PTransformWithSideInputs):
'fn_dd': self.fn}
def expand(self, pcoll):
- self.side_output_tags = set()
+ self.output_tags = set()
# TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
self.dofn = self.fn
return pvalue.PCollection(pcoll.pipeline)
@@ -1154,9 +1154,9 @@ class Partition(PTransformWithSideInputs):
raise ValueError(
'PartitionFn specified out-of-bounds partition index: '
'%d not in [0, %d)' % (partition, n))
- # Each input is directed into the side output that corresponds to the
+ # Each input is directed into the output that corresponds to the
# selected partition.
- yield pvalue.SideOutputValue(str(partition), element)
+ yield pvalue.OutputValue(str(partition), element)
def make_fn(self, fn):
return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn)
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 4da5443..b92af83 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -159,7 +159,7 @@ class PTransformTest(unittest.TestCase):
'is discouraged.')
self.assertStartswith(cm.exception.message, expected_error_prefix)
- def test_do_with_side_outputs_maintains_unique_name(self):
+ def test_do_with_multiple_outputs_maintains_unique_name(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
r1 = pcoll | 'A' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
@@ -176,9 +176,9 @@ class PTransformTest(unittest.TestCase):
def process(self, element):
yield element
if element % 2 == 0:
- yield pvalue.SideOutputValue('even', element)
+ yield pvalue.OutputValue('even', element)
else:
- yield pvalue.SideOutputValue('odd', element)
+ yield pvalue.OutputValue('odd', element)
pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
@@ -193,8 +193,8 @@ class PTransformTest(unittest.TestCase):
def test_par_do_with_multiple_outputs_and_using_return(self):
def some_fn(v):
if v % 2 == 0:
- return [v, pvalue.SideOutputValue('even', v)]
- return [v, pvalue.SideOutputValue('odd', v)]
+ return [v, pvalue.OutputValue('even', v)]
+ return [v, pvalue.OutputValue('odd', v)]
pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
@@ -206,12 +206,12 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
@attr('ValidatesRunner')
- def test_undeclared_side_outputs(self):
+ def test_undeclared_outputs(self):
pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
results = nums | 'ClassifyNumbers' >> beam.FlatMap(
lambda x: [x,
- pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+ pvalue.OutputValue('even' if x % 2 == 0 else 'odd', x)]
).with_outputs()
assert_that(results[None], equal_to([1, 2, 3, 4]))
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
@@ -219,12 +219,12 @@ class PTransformTest(unittest.TestCase):
pipeline.run()
@attr('ValidatesRunner')
- def test_empty_side_outputs(self):
+ def test_multiple_empty_outputs(self):
pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5])
results = nums | 'ClassifyNumbers' >> beam.FlatMap(
lambda x: [x,
- pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+ pvalue.OutputValue('even' if x % 2 == 0 else 'odd', x)]
).with_outputs()
assert_that(results[None], equal_to([1, 3, 5]))
assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index 2e7176e..e475d9d 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -22,7 +22,7 @@ import inspect
import sys
import types
-from apache_beam.pvalue import SideOutputValue
+from apache_beam.pvalue import OutputValue
from apache_beam.transforms.core import DoFn
from apache_beam.transforms.window import WindowedValue
from apache_beam.typehints import check_constraint
@@ -136,7 +136,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
def type_check_output(o):
# TODO(robertwb): Multi-output.
- x = o.value if isinstance(o, (SideOutputValue, WindowedValue)) else o
+ x = o.value if isinstance(o, (OutputValue, WindowedValue)) else o
self._type_check(self._output_type_hint, x, is_input=False)
# If the return type is a generator, then we will need to interleave our