You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/03/31 19:11:42 UTC
[1/2] beam git commit: Closes #2388
Repository: beam
Updated Branches:
refs/heads/master 132d3c5f6 -> affb926cc
Closes #2388
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/affb926c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/affb926c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/affb926c
Branch: refs/heads/master
Commit: affb926ccd9302734805d8c0db418d006ee37672
Parents: 132d3c5 207de81
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Mar 31 12:11:00 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Mar 31 12:11:00 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 12 +-
sdks/python/apache_beam/pvalue.py | 258 +++++++------------
sdks/python/apache_beam/pvalue_test.py | 33 ---
.../runners/dataflow/dataflow_runner.py | 29 ++-
.../runners/direct/bundle_factory.py | 3 +-
.../consumer_tracking_pipeline_visitor.py | 11 +-
.../consumer_tracking_pipeline_visitor_test.py | 4 +-
.../runners/direct/evaluation_context.py | 60 +++--
.../apache_beam/runners/direct/executor.py | 7 +-
.../runners/direct/transform_evaluator.py | 51 +---
sdks/python/apache_beam/transforms/core.py | 2 +-
.../python/apache_beam/transforms/ptransform.py | 4 +-
.../python/apache_beam/transforms/sideinputs.py | 132 ----------
.../apache_beam/transforms/sideinputs_test.py | 91 +++----
14 files changed, 198 insertions(+), 499 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Change side inputs to be references rather
than full PValues.
Posted by ro...@apache.org.
Change side inputs to be references rather than full PValues.
This is more consistent with the Runner API's structure.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/207de81b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/207de81b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/207de81b
Branch: refs/heads/master
Commit: 207de81bca4c3761cf663d32f9b95a022ef97165
Parents: 132d3c5
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Mar 30 08:20:21 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Mar 31 12:11:00 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 12 +-
sdks/python/apache_beam/pvalue.py | 258 +++++++------------
sdks/python/apache_beam/pvalue_test.py | 33 ---
.../runners/dataflow/dataflow_runner.py | 29 ++-
.../runners/direct/bundle_factory.py | 3 +-
.../consumer_tracking_pipeline_visitor.py | 11 +-
.../consumer_tracking_pipeline_visitor_test.py | 4 +-
.../runners/direct/evaluation_context.py | 60 +++--
.../apache_beam/runners/direct/executor.py | 7 +-
.../runners/direct/transform_evaluator.py | 51 +---
sdks/python/apache_beam/transforms/core.py | 2 +-
.../python/apache_beam/transforms/ptransform.py | 4 +-
.../python/apache_beam/transforms/sideinputs.py | 132 ----------
.../apache_beam/transforms/sideinputs_test.py | 91 +++----
14 files changed, 198 insertions(+), 499 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index be2a79d..ee5904b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -142,9 +142,6 @@ class Pipeline(object):
# If a transform is applied and the full label is already in the set
# then the transform will have to be cloned with a new label.
self.applied_labels = set()
- # Store cache of views created from PCollections. For reference, see
- # pvalue._cache_view().
- self._view_cache = {}
def _current_transform(self):
"""Returns the transform currently on the top of the stack."""
@@ -271,8 +268,8 @@ class Pipeline(object):
result.producer = current
# TODO(robertwb): Multi-input, multi-output inference.
# TODO(robertwb): Ideally we'd do intersection here.
- if (type_options is not None and type_options.pipeline_type_check and
- isinstance(result, (pvalue.PCollection, pvalue.PCollectionView))
+ if (type_options is not None and type_options.pipeline_type_check
+ and isinstance(result, pvalue.PCollection)
and not result.element_type):
input_element_type = (
inputs[0].element_type
@@ -416,7 +413,7 @@ class AppliedPTransform(object):
if not isinstance(main_input, pvalue.PBegin):
real_producer(main_input).refcounts[main_input.tag] += 1
for side_input in self.side_inputs:
- real_producer(side_input).refcounts[side_input.tag] += 1
+ real_producer(side_input.pvalue).refcounts[side_input.pvalue.tag] += 1
def add_output(self, output, tag=None):
if isinstance(output, pvalue.DoOutputsTuple):
@@ -456,7 +453,8 @@ class AppliedPTransform(object):
# Visit side inputs.
for pval in self.side_inputs:
- if isinstance(pval, pvalue.PCollectionView) and pval not in visited:
+ if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited:
+ pval = pval.pvalue # Unpack marker-object-wrapped pvalue.
assert pval.producer is not None
pval.producer.visit(visitor, pipeline, visited)
# The value should be visited now since we visit outputs too.
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index 4114b3f..bfe1745 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -26,9 +26,10 @@ produced when the pipeline gets executed.
from __future__ import absolute_import
-import collections
import itertools
+from apache_beam import typehints
+
class PValue(object):
"""Base class for PCollection.
@@ -250,20 +251,22 @@ class SideOutputValue(object):
self.value = value
-class PCollectionView(PValue):
- """An immutable view of a PCollection that can be used as a side input."""
+class AsSideInput(object):
+ """Marker specifying that a PCollection will be used as a side input.
- def __init__(self, pipeline, window_mapping_fn):
- """Initializes a PCollectionView. Do not call directly."""
- super(PCollectionView, self).__init__(pipeline)
- self._window_mapping_fn = window_mapping_fn
+ When a PCollection is supplied as a side input to a PTransform, it is
+ necessary to indicate how the PCollection should be made available
+ as a PTransform side argument (e.g. in the form of an iterable, mapping,
+ or single value). This class is the superclass of all the various
+ options, and should not be instantiated directly. (See instead AsSingleton,
+ AsIter, etc.)
+ """
- @property
- def windowing(self):
- if not hasattr(self, '_windowing'):
- self._windowing = self.producer.transform.get_windowing(
- self.producer.inputs)
- return self._windowing
+ def __init__(self, pcoll):
+ from apache_beam.transforms import sideinputs
+ self.pvalue = pcoll
+ self._window_mapping_fn = sideinputs.default_window_mapping_fn(
+ pcoll.windowing.windowfn)
def _view_options(self):
"""Internal options corresponding to specific view.
@@ -275,19 +278,39 @@ class PCollectionView(PValue):
"""
return {'window_mapping_fn': self._window_mapping_fn}
+ @property
+ def element_type(self):
+ return typehints.Any
+
+
+class AsSingleton(AsSideInput):
+ """Marker specifying that an entire PCollection is to be used as a side input.
+
+ When a PCollection is supplied as a side input to a PTransform, it is
+ necessary to indicate whether the entire PCollection should be made available
+ as a PTransform side argument (in the form of an iterable), or whether just
+ one value should be pulled from the PCollection and supplied as the side
+ argument (as an ordinary value).
-class SingletonPCollectionView(PCollectionView):
- """A PCollectionView that contains a single object."""
+ Wrapping a PCollection side input argument to a PTransform in this container
+ (e.g., data.apply('label', MyPTransform(), AsSingleton(my_side_input) )
+ selects the latter behavor.
- def __init__(self, pipeline, has_default, default_value,
- window_mapping_fn):
- super(SingletonPCollectionView, self).__init__(pipeline, window_mapping_fn)
- self.has_default = has_default
+ The input PCollection must contain exactly one value per window, unless a
+ default is given, in which case it may be empty.
+ """
+ _NO_DEFAULT = object()
+
+ def __init__(self, pcoll, default_value=_NO_DEFAULT):
+ super(AsSingleton, self).__init__(pcoll)
self.default_value = default_value
+ def __repr__(self):
+ return 'AsSingleton(%s)' % self.pvalue
+
def _view_options(self):
- base = super(SingletonPCollectionView, self)._view_options()
- if self.has_default:
+ base = super(AsSingleton, self)._view_options()
+ if self.default_value != AsSingleton._NO_DEFAULT:
return dict(base, default=self.default_value)
else:
return base
@@ -304,182 +327,83 @@ class SingletonPCollectionView(PCollectionView):
'PCollection with more than one element accessed as '
'a singleton view.')
-
-class IterablePCollectionView(PCollectionView):
- """A PCollectionView that can be treated as an iterable."""
-
- @staticmethod
- def _from_runtime_iterable(it, options):
- return it
+ @property
+ def element_type(self):
+ return self.pvalue.element_type
-class ListPCollectionView(PCollectionView):
- """A PCollectionView that can be treated as a list."""
+class AsIter(AsSideInput):
+ """Marker specifying that an entire PCollection is to be used as a side input.
- @staticmethod
- def _from_runtime_iterable(it, options):
- return list(it)
+ When a PCollection is supplied as a side input to a PTransform, it is
+ necessary to indicate whether the entire PCollection should be made available
+ as a PTransform side argument (in the form of an iterable), or whether just
+ one value should be pulled from the PCollection and supplied as the side
+ argument (as an ordinary value).
+ Wrapping a PCollection side input argument to a PTransform in this container
+ (e.g., data.apply('label', MyPTransform(), AsIter(my_side_input) ) selects the
+ former behavor.
+ """
-class DictPCollectionView(PCollectionView):
- """A PCollectionView that can be treated as a dict."""
+ def __repr__(self):
+ return 'AsIter(%s)' % self.pvalue
@staticmethod
def _from_runtime_iterable(it, options):
- return dict(it)
-
-
-def _get_cached_view(pipeline, key):
- return pipeline._view_cache.get(key, None) # pylint: disable=protected-access
-
-
-def _cache_view(pipeline, key, view):
- pipeline._view_cache[key] = view # pylint: disable=protected-access
-
-
-def _format_view_label(pcoll):
- # The monitoring UI doesn't like '/' character in transform labels.
- if not pcoll.producer:
- return str(pcoll.tag)
- return '%s.%s' % (pcoll.producer.full_label.replace('/', '|'),
- pcoll.tag)
-
-
-_SINGLETON_NO_DEFAULT = object()
-
+ return it
-def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None): # pylint: disable=invalid-name
- """Create a SingletonPCollectionView from the contents of input PCollection.
+ @property
+ def element_type(self):
+ return typehints.Iterable[self.pvalue.element_type]
- The input PCollection should contain at most one element (per window) and the
- resulting PCollectionView can then be used as a side input to PTransforms. If
- the PCollectionView is empty (for a given window), the side input value will
- be the default_value, if specified; otherwise, it will be an EmptySideInput
- object.
- Args:
- pcoll: Input pcollection.
- default_value: Default value for the singleton view.
- label: Label to be specified if several AsSingleton's with different
- defaults for the same PCollection.
+class AsList(AsSideInput):
+ """Marker specifying that an entire PCollection is to be used as a side input.
- Returns:
- A singleton PCollectionView containing the element as above.
- """
- label = label or _format_view_label(pcoll)
- has_default = default_value is not _SINGLETON_NO_DEFAULT
- if not has_default:
- default_value = None
-
- # Don't recreate the view if it was already created.
- hashable_default_value = ('val', default_value)
- if not isinstance(default_value, collections.Hashable):
- # Massage default value to treat as hash key.
- hashable_default_value = ('id', id(default_value))
- cache_key = (pcoll, AsSingleton, has_default, hashable_default_value)
- cached_view = _get_cached_view(pcoll.pipeline, cache_key)
- if cached_view:
- return cached_view
-
- # Local import is required due to dependency loop; even though the
- # implementation of this function requires concepts defined in modules that
- # depend on pvalue, it lives in this module to reduce user workload.
- from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position
- view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value,
- label=label))
- _cache_view(pcoll.pipeline, cache_key, view)
- return view
-
-
-def AsIter(pcoll, label=None): # pylint: disable=invalid-name
- """Create an IterablePCollectionView from the elements of input PCollection.
-
- The contents of the given PCollection will be available as an iterable in
- PTransforms that use the returned PCollectionView as a side input.
+ Intended for use in side-argument specification---the same places where
+ AsSingleton and AsIter are used, but forces materialization of this
+ PCollection as a list.
Args:
pcoll: Input pcollection.
- label: Label to be specified if several AsIter's for the same PCollection.
Returns:
- An iterable PCollectionView containing the elements as above.
+ An AsList-wrapper around a PCollection whose one element is a list
+ containing all elements in pcoll.
"""
- label = label or _format_view_label(pcoll)
-
- # Don't recreate the view if it was already created.
- cache_key = (pcoll, AsIter)
- cached_view = _get_cached_view(pcoll.pipeline, cache_key)
- if cached_view:
- return cached_view
- # Local import is required due to dependency loop; even though the
- # implementation of this function requires concepts defined in modules that
- # depend on pvalue, it lives in this module to reduce user workload.
- from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position
- view = (pcoll | sideinputs.ViewAsIterable(label=label))
- _cache_view(pcoll.pipeline, cache_key, view)
- return view
+ @staticmethod
+ def _from_runtime_iterable(it, options):
+ return list(it)
-def AsList(pcoll, label=None): # pylint: disable=invalid-name
- """Create a ListPCollectionView from the elements of input PCollection.
+class AsDict(AsSideInput):
+ """Marker specifying a PCollection to be used as an indexable side input.
- The contents of the given PCollection will be available as a list-like object
- in PTransforms that use the returned PCollectionView as a side input.
+ Intended for use in side-argument specification---the same places where
+ AsSingleton and AsIter are used, but returns an interface that allows
+ key lookup.
Args:
- pcoll: Input pcollection.
- label: Label to be specified if several AsList's for the same PCollection.
+ pcoll: Input pcollection. All elements should be key-value pairs (i.e.
+ 2-tuples) with unique keys.
Returns:
- A list PCollectionView containing the elements as above.
+ An AsDict-wrapper around a PCollection whose one element is a dict with
+ entries for uniquely-keyed pairs in pcoll.
"""
- label = label or _format_view_label(pcoll)
-
- # Don't recreate the view if it was already created.
- cache_key = (pcoll, AsList)
- cached_view = _get_cached_view(pcoll.pipeline, cache_key)
- if cached_view:
- return cached_view
-
- # Local import is required due to dependency loop; even though the
- # implementation of this function requires concepts defined in modules that
- # depend on pvalue, it lives in this module to reduce user workload.
- from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position
- view = (pcoll | sideinputs.ViewAsList(label=label))
- _cache_view(pcoll.pipeline, cache_key, view)
- return view
-
-def AsDict(pcoll, label=None): # pylint: disable=invalid-name
- """Create a DictPCollectionView from the elements of input PCollection.
+ @staticmethod
+ def _from_runtime_iterable(it, options):
+ return dict(it)
- The contents of the given PCollection whose elements are 2-tuples of key and
- value will be available as a dict-like object in PTransforms that use the
- returned PCollectionView as a side input.
- Args:
- pcoll: Input pcollection containing 2-tuples of key and value.
- label: Label to be specified if several AsDict's for the same PCollection.
-
- Returns:
- A dict PCollectionView containing the dict as above.
- """
- label = label or _format_view_label(pcoll)
-
- # Don't recreate the view if it was already created.
- cache_key = (pcoll, AsDict)
- cached_view = _get_cached_view(pcoll.pipeline, cache_key)
- if cached_view:
- return cached_view
-
- # Local import is required due to dependency loop; even though the
- # implementation of this function requires concepts defined in modules that
- # depend on pvalue, it lives in this module to reduce user workload.
- from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position
- view = (pcoll | sideinputs.ViewAsDict(label=label))
- _cache_view(pcoll.pipeline, cache_key, view)
- return view
+# For backwards compatibility with worker code.
+SingletonPCollectionView = AsSingleton
+IterablePCollectionView = AsIter
+ListPCollectionView = AsList
+DictPCollectionView = AsDict
class EmptySideInput(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index 86f1987..529ddf7 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -19,22 +19,8 @@
import unittest
-from apache_beam.pipeline import Pipeline
-from apache_beam.pvalue import AsDict
-from apache_beam.pvalue import AsIter
-from apache_beam.pvalue import AsList
-from apache_beam.pvalue import AsSingleton
from apache_beam.pvalue import PValue
from apache_beam.test_pipeline import TestPipeline
-from apache_beam.transforms import Create
-
-
-class FakePipeline(Pipeline):
- """Fake pipeline object used to check if apply() receives correct args."""
-
- def apply(self, *args, **kwargs):
- self.args = args
- self.kwargs = kwargs
class PValueTest(unittest.TestCase):
@@ -44,25 +30,6 @@ class PValueTest(unittest.TestCase):
value = PValue(pipeline)
self.assertEqual(pipeline, value.pipeline)
- def test_pcollectionview_not_recreated(self):
- pipeline = TestPipeline()
- value = pipeline | 'create1' >> Create([1, 2, 3])
- value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
- value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])
- self.assertEqual(AsSingleton(value), AsSingleton(value))
- self.assertEqual(AsSingleton(value, default_value=1, label='new'),
- AsSingleton(value, default_value=1, label='new'))
- self.assertNotEqual(AsSingleton(value),
- AsSingleton(value, default_value=1, label='new'))
- self.assertEqual(AsIter(value), AsIter(value))
- self.assertEqual(AsList(value), AsList(value))
- self.assertEqual(AsDict(value2), AsDict(value2))
-
- self.assertNotEqual(AsSingleton(value), AsSingleton(value2))
- self.assertNotEqual(AsIter(value), AsIter(value2))
- self.assertNotEqual(AsList(value), AsList(value2))
- self.assertNotEqual(AsDict(value2), AsDict(value3))
-
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index a82671c..db433df 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -31,7 +31,7 @@ from apache_beam import coders
from apache_beam import pvalue
from apache_beam.internal import pickler
from apache_beam.internal.gcp import json_value
-from apache_beam.pvalue import PCollectionView
+from apache_beam.pvalue import AsSideInput
from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
@@ -284,23 +284,26 @@ class DataflowRunner(PipelineRunner):
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
- def run_CreatePCollectionView(self, transform_node):
- step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON,
- transform_node.full_label, transform_node)
- input_tag = transform_node.inputs[0].tag
- input_step = self._cache.get_pvalue(transform_node.inputs[0])
+ def _add_singleton_step(self, label, full_label, tag, input_step):
+ """Creates a CollectionToSingleton step used to handle ParDo side inputs."""
+ # Import here to avoid adding the dependency for local running scenarios.
+ from google.cloud.dataflow.internal import apiclient
+ step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label)
+ self.job.proto.steps.append(step.proto)
+ step.add_property(PropertyNames.USER_NAME, full_label)
step.add_property(
PropertyNames.PARALLEL_INPUT,
{'@type': 'OutputReference',
PropertyNames.STEP_NAME: input_step.proto.name,
- PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+ PropertyNames.OUTPUT_NAME: input_step.get_output(tag)})
step.encoding = self._get_side_input_encoding(input_step.encoding)
step.add_property(
PropertyNames.OUTPUT_INFO,
[{PropertyNames.USER_NAME: (
- '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+ '%s.%s' % (full_label, PropertyNames.OUTPUT)),
PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+ PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}])
+ return step
def run_Flatten(self, transform_node):
step = self._add_step(TransformNames.FLATTEN,
@@ -375,8 +378,12 @@ class DataflowRunner(PipelineRunner):
si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name
lookup_label = lambda side_pval: si_labels[side_pval]
for side_pval in transform_node.side_inputs:
- assert isinstance(side_pval, PCollectionView)
- si_label = lookup_label(side_pval)
+ assert isinstance(side_pval, AsSideInput)
+ si_label = self._get_unique_step_name()
+ si_full_label = '%s/%s' % (transform_node.full_label, si_label)
+ self._add_singleton_step(
+ si_label, si_full_label, side_pval.pvalue.tag,
+ self._cache.get_pvalue(side_pval.pvalue))
si_dict[si_label] = {
'@type': 'OutputReference',
PropertyNames.STEP_NAME: si_label,
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 63319af..647b5f2 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -106,8 +106,7 @@ class Bundle(object):
self._initial_windowed_value.windows)
def __init__(self, pcollection, stacked=True):
- assert (isinstance(pcollection, pvalue.PCollection)
- or isinstance(pcollection, pvalue.PCollectionView))
+ assert isinstance(pcollection, pvalue.PCollection)
self._pcollection = pcollection
self._elements = []
self._stacked = stacked
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
index 6f1757a..cdfadb7 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
@@ -34,18 +34,13 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor):
def __init__(self):
self.value_to_consumers = {} # Map from PValue to [AppliedPTransform].
self.root_transforms = set() # set of (root) AppliedPTransforms.
- self.views = [] # list of PCollectionViews.
+ self.views = [] # list of side inputs.
self.step_names = {} # Map from AppliedPTransform to String.
self._num_transforms = 0
- def visit_value(self, value, producer_node):
- if value:
- if isinstance(value, pvalue.PCollectionView):
- self.views.append(value)
-
def visit_transform(self, applied_ptransform):
- inputs = applied_ptransform.inputs
+ inputs = list(applied_ptransform.inputs)
if inputs:
for input_value in inputs:
if isinstance(input_value, pvalue.PBegin):
@@ -57,3 +52,5 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor):
self.root_transforms.add(applied_ptransform)
self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
self._num_transforms += 1
+ for side_input in applied_ptransform.side_inputs:
+ self.views.append(side_input)
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 73b897f..eb8b14b 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -102,10 +102,10 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
root_transforms = sorted(
[t.transform for t in self.visitor.root_transforms])
self.assertEqual(root_transforms, sorted([root_create]))
- self.assertEqual(len(self.visitor.step_names), 4)
+ self.assertEqual(len(self.visitor.step_names), 3)
self.assertEqual(len(self.visitor.views), 1)
self.assertTrue(isinstance(self.visitor.views[0],
- pvalue.ListPCollectionView))
+ pvalue.AsList))
def test_co_group_by_key(self):
emails = self.pipeline | 'email' >> Create([('joe', 'joe@example.com')])
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 7a7f318..8114104 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -50,12 +50,13 @@ class _SideInputView(object):
def __init__(self, view):
self._view = view
self.callable_queue = collections.deque()
+ self.elements = []
self.value = None
self.has_result = False
class _SideInputsContainer(object):
- """An in-process container for PCollectionViews.
+ """An in-process container for side inputs.
It provides methods for blocking until a side-input is available and writing
to a side input.
@@ -67,21 +68,28 @@ class _SideInputsContainer(object):
for view in views:
self._views[view] = _SideInputView(view)
- def get_value_or_schedule_after_output(self, pcollection_view, task):
+ def get_value_or_schedule_after_output(self, side_input, task):
with self._lock:
- view = self._views[pcollection_view]
+ view = self._views[side_input]
if not view.has_result:
view.callable_queue.append(task)
task.blocked = True
return (view.has_result, view.value)
- def set_value_and_get_callables(self, pcollection_view, values):
+ def add_values(self, side_input, values):
with self._lock:
- view = self._views[pcollection_view]
+ view = self._views[side_input]
+ assert not view.has_result
+ view.elements.extend(values)
+
+ def finalize_value_and_get_tasks(self, side_input):
+ with self._lock:
+ view = self._views[side_input]
assert not view.has_result
assert view.value is None
assert view.callable_queue is not None
- view.value = self._pvalue_to_value(pcollection_view, values)
+ view.value = self._pvalue_to_value(side_input, view.elements)
+ view.elements = None
result = tuple(view.callable_queue)
for task in result:
task.blocked = False
@@ -90,10 +98,10 @@ class _SideInputsContainer(object):
return result
def _pvalue_to_value(self, view, values):
- """Given a PCollectionView, returns the associated value in requested form.
+ """Given a side input view, returns the associated value in requested form.
Args:
- view: PCollectionView for the requested side input.
+ view: SideInput for the requested side input.
values: Iterable values associated with the side input.
Returns:
@@ -115,9 +123,9 @@ class EvaluationContext(object):
EvaluationContext contains shared state for an execution of the
DirectRunner that can be used while evaluating a PTransform. This
consists of views into underlying state and watermark implementations, access
- to read and write PCollectionViews, and constructing counter sets and
+ to read and write side inputs, and constructing counter sets and
execution contexts. This includes executing callbacks asynchronously when
- state changes to the appropriate point (e.g. when a PCollectionView is
+ state changes to the appropriate point (e.g. when a side input is
requested and known to be empty).
EvaluationContext also handles results by committing finalizing
@@ -134,6 +142,9 @@ class EvaluationContext(object):
self._value_to_consumers = value_to_consumers
self._step_names = step_names
self.views = views
+ self._pcollection_to_views = collections.defaultdict(list)
+ for view in views:
+ self._pcollection_to_views[view.pvalue].append(view)
# AppliedPTransform -> Evaluator specific state objects
self._application_state_interals = {}
@@ -198,17 +209,20 @@ class EvaluationContext(object):
# If the result is for a view, update side inputs container.
if (result.output_bundles
- and result.output_bundles[0].pcollection in self.views):
- if committed_bundles:
- assert len(committed_bundles) == 1
- # side_input must be materialized.
- side_input_result = committed_bundles[0].get_elements_iterable(
- make_copy=True)
- else:
- side_input_result = []
- tasks = self._side_inputs_container.set_value_and_get_callables(
- result.output_bundles[0].pcollection, side_input_result)
- self._pending_unblocked_tasks.extend(tasks)
+ and result.output_bundles[0].pcollection
+ in self._pcollection_to_views):
+ for view in self._pcollection_to_views[
+ result.output_bundles[0].pcollection]:
+ for committed_bundle in committed_bundles:
+ # side_input must be materialized.
+ self._side_inputs_container.add_values(
+ view,
+ committed_bundle.get_elements_iterable(make_copy=True))
+ if (self.get_execution_context(result.transform)
+ .watermarks.input_watermark
+ == WatermarkManager.WATERMARK_POS_INF):
+ self._pending_unblocked_tasks.extend(
+ self._side_inputs_container.finalize_value_and_get_tasks(view))
if result.counters:
for counter in result.counters:
@@ -277,7 +291,7 @@ class EvaluationContext(object):
tw = self._watermark_manager.get_watermarks(transform)
return tw.output_watermark == WatermarkManager.WATERMARK_POS_INF
- def get_value_or_schedule_after_output(self, pcollection_view, task):
+ def get_value_or_schedule_after_output(self, side_input, task):
assert isinstance(task, TransformExecutor)
return self._side_inputs_container.get_value_or_schedule_after_output(
- pcollection_view, task)
+ side_input, task)
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 27b6f2f..ce6356c 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -76,6 +76,7 @@ class ExecutorService(object):
return None
def run(self):
+
while not self.shutdown_requested:
task = self._get_task_or_none()
if task:
@@ -249,9 +250,9 @@ class _TimerCompletionCallback(_CompletionCallback):
class TransformExecutor(ExecutorService.CallableTask):
"""TransformExecutor will evaluate a bundle using an applied ptransform.
- A CallableTask responsible for constructing a TransformEvaluator andevaluating
- it on some bundle of input, and registering the result using the completion
- callback.
+ A CallableTask responsible for constructing a TransformEvaluator and
+ evaluating it on some bundle of input, and registering the result using the
+ completion callback.
"""
def __init__(self, transform_evaluator_registry, evaluation_context,
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index f9a0692..0c35d99 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -31,7 +31,6 @@ from apache_beam.runners.direct.watermark_manager import WatermarkManager
from apache_beam.runners.direct.transform_result import TransformResult
from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access
from apache_beam.transforms import core
-from apache_beam.transforms import sideinputs
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import WindowedValue
from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
@@ -54,7 +53,6 @@ class TransformEvaluatorRegistry(object):
core.Flatten: _FlattenEvaluator,
core.ParDo: _ParDoEvaluator,
core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
- sideinputs.CreatePCollectionView: _CreatePCollectionViewEvaluator,
_NativeWrite: _NativeWriteEvaluator,
}
@@ -100,8 +98,7 @@ class TransformEvaluatorRegistry(object):
True if executor should execute applied_ptransform serially.
"""
return isinstance(applied_ptransform.transform,
- (core.GroupByKeyOnly, sideinputs.CreatePCollectionView,
- _NativeWrite))
+ (core.GroupByKeyOnly, _NativeWrite))
class _TransformEvaluator(object):
@@ -444,52 +441,6 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
self._applied_ptransform, bundles, state, None, None, hold)
-class _CreatePCollectionViewEvaluator(_TransformEvaluator):
- """TransformEvaluator for CreatePCollectionView transform."""
-
- def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs, scoped_metrics_container):
- assert not side_inputs
- super(_CreatePCollectionViewEvaluator, self).__init__(
- evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs, scoped_metrics_container)
-
- @property
- def _is_final_bundle(self):
- return (self._execution_context.watermarks.input_watermark
- == WatermarkManager.WATERMARK_POS_INF)
-
- def start_bundle(self):
- # state: [values]
- self.state = (self._execution_context.existing_state
- if self._execution_context.existing_state else [])
-
- assert len(self._outputs) == 1
- self.output_pcollection = list(self._outputs)[0]
-
- def process_element(self, element):
- self.state.append(element)
-
- def finish_bundle(self):
- if self._is_final_bundle:
- bundle = self._evaluation_context.create_bundle(self.output_pcollection)
-
- view_result = self.state
- for result in view_result:
- bundle.output(result)
-
- bundles = [bundle]
- state = None
- hold = WatermarkManager.WATERMARK_POS_INF
- else:
- bundles = []
- state = self.state
- hold = WatermarkManager.WATERMARK_NEG_INF
-
- return TransformResult(
- self._applied_ptransform, bundles, state, None, None, hold)
-
-
class _NativeWriteEvaluator(_TransformEvaluator):
"""TransformEvaluator for _NativeWrite transform."""
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 7a52828..88fdec8 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -879,7 +879,7 @@ class CombineGlobally(PTransform):
else CombineFn.from_callable(self.fn))
default_value = combine_fn.apply([], *self.args, **self.kwargs)
else:
- default_value = pvalue._SINGLETON_NO_DEFAULT # pylint: disable=protected-access
+ default_value = pvalue.AsSingleton._NO_DEFAULT # pylint: disable=protected-access
view = pvalue.AsSingleton(combined, default_value=default_value)
if self.as_view:
return view
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index aca5822..93d751d 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -463,7 +463,7 @@ class PTransformWithSideInputs(PTransform):
'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the '
'PCollection is to be used.')
self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args(
- args, kwargs, pvalue.PCollectionView)
+ args, kwargs, pvalue.AsSideInput)
self.raw_side_inputs = args, kwargs
# Prevent name collisions with fns of the form '<function <lambda> at ...>'
@@ -519,7 +519,7 @@ class PTransformWithSideInputs(PTransform):
args, kwargs = self.raw_side_inputs
def element_type(side_input):
- if isinstance(side_input, pvalue.PCollectionView):
+ if isinstance(side_input, pvalue.AsSideInput):
return side_input.element_type
else:
return instance_to_type(side_input)
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 46731bf..1de7bac 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -24,140 +24,8 @@ AsSingleton, AsIter, AsList and AsDict in apache_beam.pvalue.
from __future__ import absolute_import
-from apache_beam import pvalue
-from apache_beam import typehints
-from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms import window
-# Type variables
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
-
-
-class CreatePCollectionView(PTransform):
- """Transform to materialize a given PCollectionView in the pipeline.
-
- Important: this transform is an implementation detail and should not be used
- directly by pipeline writers.
- """
-
- def __init__(self, view):
- self.view = view
- super(CreatePCollectionView, self).__init__()
-
- def infer_output_type(self, input_type):
- # TODO(ccy): Figure out if we want to create a new type of type hint, i.e.,
- # typehints.View[...].
- return input_type
-
- def expand(self, pcoll):
- return self.view
-
-
-class ViewAsSingleton(PTransform):
- """Transform to view PCollection as a singleton PCollectionView.
-
- Important: this transform is an implementation detail and should not be used
- directly by pipeline writers. Use pvalue.AsSingleton(...) instead.
- """
-
- def __init__(self, has_default, default_value, label=None):
- if label:
- label = 'ViewAsSingleton(%s)' % label
- super(ViewAsSingleton, self).__init__(label=label)
- self.has_default = has_default
- self.default_value = default_value
-
- def expand(self, pcoll):
- self._check_pcollection(pcoll)
- input_type = pcoll.element_type
- output_type = input_type
- return (pcoll
- | CreatePCollectionView(
- pvalue.SingletonPCollectionView(
- pcoll.pipeline, self.has_default, self.default_value,
- default_window_mapping_fn(pcoll.windowing.windowfn)))
- .with_input_types(input_type)
- .with_output_types(output_type))
-
-
-class ViewAsIterable(PTransform):
- """Transform to view PCollection as an iterable PCollectionView.
-
- Important: this transform is an implementation detail and should not be used
- directly by pipeline writers. Use pvalue.AsIter(...) instead.
- """
-
- def __init__(self, label=None):
- if label:
- label = 'ViewAsIterable(%s)' % label
- super(ViewAsIterable, self).__init__(label=label)
-
- def expand(self, pcoll):
- self._check_pcollection(pcoll)
- input_type = pcoll.element_type
- output_type = typehints.Iterable[input_type]
- return (pcoll
- | CreatePCollectionView(
- pvalue.IterablePCollectionView(
- pcoll.pipeline,
- default_window_mapping_fn(pcoll.windowing.windowfn)))
- .with_input_types(input_type)
- .with_output_types(output_type))
-
-
-class ViewAsList(PTransform):
- """Transform to view PCollection as a list PCollectionView.
-
- Important: this transform is an implementation detail and should not be used
- directly by pipeline writers. Use pvalue.AsList(...) instead.
- """
-
- def __init__(self, label=None):
- if label:
- label = 'ViewAsList(%s)' % label
- super(ViewAsList, self).__init__(label=label)
-
- def expand(self, pcoll):
- self._check_pcollection(pcoll)
- input_type = pcoll.element_type
- output_type = typehints.List[input_type]
- return (pcoll
- | CreatePCollectionView(pvalue.ListPCollectionView(
- pcoll.pipeline,
- default_window_mapping_fn(pcoll.windowing.windowfn)))
- .with_input_types(input_type)
- .with_output_types(output_type))
-
-
-@typehints.with_input_types(typehints.Tuple[K, V])
-@typehints.with_output_types(typehints.Dict[K, V])
-class ViewAsDict(PTransform):
- """Transform to view PCollection as a dict PCollectionView.
-
- Important: this transform is an implementation detail and should not be used
- directly by pipeline writers. Use pvalue.AsDict(...) instead.
- """
-
- def __init__(self, label=None):
- if label:
- label = 'ViewAsDict(%s)' % label
- super(ViewAsDict, self).__init__(label=label)
-
- def expand(self, pcoll):
- self._check_pcollection(pcoll)
- input_type = pcoll.element_type
- key_type, value_type = (
- typehints.trivial_inference.key_value_types(input_type))
- output_type = typehints.Dict[key_type, value_type]
- return (pcoll
- | CreatePCollectionView(
- pvalue.DictPCollectionView(
- pcoll.pipeline,
- default_window_mapping_fn(pcoll.windowing.windowfn)))
- .with_input_types(input_type)
- .with_output_types(output_type))
-
# Top-level function so we can identify it later.
def _global_window_mapping_fn(w, global_window=window.GlobalWindow()):
http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 9278f4b..53669de 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -186,8 +186,8 @@ class SideInputsTest(unittest.TestCase):
main_input = pipeline | 'main input' >> beam.Create([1])
side_list = pipeline | 'side list' >> beam.Create(a_list)
side_pairs = pipeline | 'side pairs' >> beam.Create(some_pairs)
- results = main_input | 'concatenate' >> beam.FlatMap(
- lambda x, the_list, the_dict: [[x, the_list, the_dict]],
+ results = main_input | 'concatenate' >> beam.Map(
+ lambda x, the_list, the_dict: [x, the_list, the_dict],
beam.pvalue.AsList(side_list), beam.pvalue.AsDict(side_pairs))
def matcher(expected_elem, expected_list, expected_pairs):
@@ -205,13 +205,13 @@ class SideInputsTest(unittest.TestCase):
def test_as_singleton_without_unique_labels(self):
# This should succeed as calling beam.pvalue.AsSingleton on the same
# PCollection twice with the same defaults will return the same
- # PCollectionView.
+ # view.
a_list = [2]
pipeline = self.create_pipeline()
main_input = pipeline | 'main input' >> beam.Create([1])
side_list = pipeline | 'side list' >> beam.Create(a_list)
- results = main_input | beam.FlatMap(
- lambda x, s1, s2: [[x, s1, s2]],
+ results = main_input | beam.Map(
+ lambda x, s1, s2: [x, s1, s2],
beam.pvalue.AsSingleton(side_list), beam.pvalue.AsSingleton(side_list))
def matcher(expected_elem, expected_singleton):
@@ -226,34 +226,15 @@ class SideInputsTest(unittest.TestCase):
pipeline.run()
@attr('ValidatesRunner')
- def test_as_singleton_with_different_defaults_without_unique_labels(self):
- # This should fail as beam.pvalue.AsSingleton with distinct default values
- # should beam.Create distinct PCollectionViews with the same full_label.
- a_list = [2]
- pipeline = self.create_pipeline()
- main_input = pipeline | 'main input' >> beam.Create([1])
- side_list = pipeline | 'side list' >> beam.Create(a_list)
-
- with self.assertRaises(RuntimeError) as e:
- _ = main_input | beam.FlatMap(
- lambda x, s1, s2: [[x, s1, s2]],
- beam.pvalue.AsSingleton(side_list),
- beam.pvalue.AsSingleton(side_list, default_value=3))
- self.assertTrue(
- e.exception.message.startswith(
- 'Transform "ViewAsSingleton(side list.None)" does not have a '
- 'stable unique label.'))
-
- @attr('ValidatesRunner')
- def test_as_singleton_with_different_defaults_with_unique_labels(self):
+ def test_as_singleton_with_different_defaults(self):
a_list = []
pipeline = self.create_pipeline()
main_input = pipeline | 'main input' >> beam.Create([1])
side_list = pipeline | 'side list' >> beam.Create(a_list)
- results = main_input | beam.FlatMap(
- lambda x, s1, s2: [[x, s1, s2]],
- beam.pvalue.AsSingleton(side_list, default_value=2, label='si1'),
- beam.pvalue.AsSingleton(side_list, default_value=3, label='si2'))
+ results = main_input | beam.Map(
+ lambda x, s1, s2: [x, s1, s2],
+ beam.pvalue.AsSingleton(side_list, default_value=2),
+ beam.pvalue.AsSingleton(side_list, default_value=3))
def matcher(expected_elem, expected_singleton1, expected_singleton2):
def match(actual):
@@ -267,15 +248,15 @@ class SideInputsTest(unittest.TestCase):
pipeline.run()
@attr('ValidatesRunner')
- def test_as_list_without_unique_labels(self):
+ def test_as_list_twice(self):
# This should succeed as calling beam.pvalue.AsList on the same
- # PCollection twice will return the same PCollectionView.
+ # PCollection twice will return the same view.
a_list = [1, 2, 3]
pipeline = self.create_pipeline()
main_input = pipeline | 'main input' >> beam.Create([1])
side_list = pipeline | 'side list' >> beam.Create(a_list)
- results = main_input | beam.FlatMap(
- lambda x, ls1, ls2: [[x, ls1, ls2]],
+ results = main_input | beam.Map(
+ lambda x, ls1, ls2: [x, ls1, ls2],
beam.pvalue.AsList(side_list), beam.pvalue.AsList(side_list))
def matcher(expected_elem, expected_list):
@@ -290,37 +271,15 @@ class SideInputsTest(unittest.TestCase):
pipeline.run()
@attr('ValidatesRunner')
- def test_as_list_with_unique_labels(self):
- a_list = [1, 2, 3]
- pipeline = self.create_pipeline()
- main_input = pipeline | 'main input' >> beam.Create([1])
- side_list = pipeline | 'side list' >> beam.Create(a_list)
- results = main_input | beam.FlatMap(
- lambda x, ls1, ls2: [[x, ls1, ls2]],
- beam.pvalue.AsList(side_list),
- beam.pvalue.AsList(side_list, label='label'))
-
- def matcher(expected_elem, expected_list):
- def match(actual):
- [[actual_elem, actual_list1, actual_list2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_list)(actual_list1)
- equal_to(expected_list)(actual_list2)
- return match
-
- assert_that(results, matcher(1, [1, 2, 3]))
- pipeline.run()
-
- @attr('ValidatesRunner')
- def test_as_dict_with_unique_labels(self):
+ def test_as_dict_twice(self):
some_kvs = [('a', 1), ('b', 2)]
pipeline = self.create_pipeline()
main_input = pipeline | 'main input' >> beam.Create([1])
side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs)
- results = main_input | beam.FlatMap(
- lambda x, dct1, dct2: [[x, dct1, dct2]],
+ results = main_input | beam.Map(
+ lambda x, dct1, dct2: [x, dct1, dct2],
beam.pvalue.AsDict(side_kvs),
- beam.pvalue.AsDict(side_kvs, label='label'))
+ beam.pvalue.AsDict(side_kvs))
def matcher(expected_elem, expected_kvs):
def match(actual):
@@ -333,6 +292,20 @@ class SideInputsTest(unittest.TestCase):
assert_that(results, matcher(1, some_kvs))
pipeline.run()
+ @attr('ValidatesRunner')
+ def test_flattened_side_input(self):
+ pipeline = self.create_pipeline()
+ main_input = pipeline | 'main input' >> beam.Create([None])
+ side_input = (
+ pipeline | 'side1' >> beam.Create(['a']),
+ pipeline | 'side2' >> beam.Create(['b'])) | beam.Flatten()
+ results = main_input | beam.FlatMap(
+ lambda _, ab: ab,
+ beam.pvalue.AsList(side_input))
+
+ assert_that(results, equal_to(['a', 'b']))
+ pipeline.run()
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)